Running Spark 1.6.2 (YARN mode)
Firstly, I have some code from this post to get filenames within Spark Streaming, so that could be the issue, but hopefully not.
Basically, I have this first job.
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
def getStream(ssc: StreamingContext, dir: String): DStream[String] = {
ssc.fileStream[LongWritable, Text, TextInputFormat](dir)
}
val sc = SparkContext.getOrCreate
val ssc = new StreamingContext(sc, Seconds(5))
val inputDir = "hdfs:///tmp/input"
val outputDir = "hdfs:///tmp/output1"
val stream1 = getStream(ssc, inputDir)
stream1.foreachRDD(rdd => rdd.saveAsTextFile(outputDir))
ssc.start()
ssc.awaitTermination()
And I also have a second job that, for this example, looks practically identical, just change around inputDir
and outputDir
, and move to a new outputDir = "hdfs:///tmp/output2"
.
Anyway, so I have to start the second streaming job before the first job because it needs to watch for new files. Makes sense...
Then, I start the first job and hadoop fs -copyFromLocal
some files into the input folder since per the API
Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.
When I try to run this, it eventually crashes with a stacktrace that contains this
17/02/01 11:48:35 INFO FileInputDStream: Finding new files took 7 ms
17/02/01 11:48:35 INFO FileInputDStream: New files at time 1485949715000 ms:
hdfs://sandbox.hortonworks.com:8020/tmp/output1/_SUCCESS
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 355.9 KB, free 356.8 KB)
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 28.9 KB, free 385.7 KB)
17/02/01 11:48:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43097 (size: 28.9 KB, free: 511.1 MB)
17/02/01 11:48:35 INFO SparkContext: Created broadcast 1 from fileStream at FileStreamTransformer.scala:45
17/02/01 11:48:35 ERROR JobScheduler: Error generating jobs for time 1485949715000 ms
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://sandbox.hortonworks.com:8020/output1/_SUCCESS
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:276)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:266)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:266)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:153)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:253)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:251)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…