Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
180 views
in Technique[技术] by (71.8m points)

scala - Spark Streaming textFileStream watch output of RDD.saveAsTextFile

Running Spark 1.6.2 (YARN mode)

Firstly, I have some code from this post to get filenames within Spark Streaming, so that could be the issue, but hopefully not.

Basically, I have this first job.

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

def getStream(ssc: StreamingContext, dir: String): DStream[String] = {
    ssc.fileStream[LongWritable, Text, TextInputFormat](dir)
}

val sc = SparkContext.getOrCreate
val ssc = new StreamingContext(sc, Seconds(5))

val inputDir = "hdfs:///tmp/input"
val outputDir = "hdfs:///tmp/output1"

val stream1 = getStream(ssc, inputDir)
stream1.foreachRDD(rdd => rdd.saveAsTextFile(outputDir))

ssc.start()
ssc.awaitTermination()

And I also have a second job that, for this example, looks practically identical, just change around inputDir and outputDir, and move to a new outputDir = "hdfs:///tmp/output2".

Anyway, so I have to start the second streaming job before the first job because it needs to watch for new files. Makes sense...

Then, I start the first job and hadoop fs -copyFromLocal some files into the input folder since per the API

Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.

When I try to run this, it eventually crashes with a stacktrace that contains this

17/02/01 11:48:35 INFO FileInputDStream: Finding new files took 7 ms
17/02/01 11:48:35 INFO FileInputDStream: New files at time 1485949715000 ms:
hdfs://sandbox.hortonworks.com:8020/tmp/output1/_SUCCESS
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 355.9 KB, free 356.8 KB)
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 28.9 KB, free 385.7 KB)
17/02/01 11:48:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43097 (size: 28.9 KB, free: 511.1 MB)
17/02/01 11:48:35 INFO SparkContext: Created broadcast 1 from fileStream at FileStreamTransformer.scala:45
17/02/01 11:48:35 ERROR JobScheduler: Error generating jobs for time 1485949715000 ms
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://sandbox.hortonworks.com:8020/output1/_SUCCESS
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
  at scala.Option.getOrElse(Option.scala:120)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:276)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:266)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:266)
  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:153)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:253)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:251)
  at scala.util.Try$.apply(Try.scala:161)
  at org.apache.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

to explicitly enforce that only new files are processed and to ensure touch files liek _SUCCESS are skipped we can use the below signature of fileStream

def getStream(ssc: StreamingContext, dir: String): DStream[String] = {
   ssc.fileStream[LongWritable, Text, TextInputFormat](dir,
      (path: org.apache.hadoop.fs.Path) => !path.getName.startsWith("_") || !path.getName().startsWith("."),
      newFilesOnly = true)
}

The newFileOnly defaults to true when not specified as shown here. So ideally _SUCCESS should not have been processed in your setup too.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...