Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
452 views
in Technique[技术] by (71.8m points)

pyspark - java.io.FileNotFoundException in Spark structured streaming job

I am trying to run spark structured streaming job that reads CSV files from local and load to HDFS in parquet format.

I start a pyspark job as follows:

pyspark2 --master yarn --executor-memory 8G --driver-memory 8G

The code looks as follows:

from pyspark.sql.types import StructType
sch = StructType(...)
spark.readStream 
  .format("csv") 
  .schema(sch) 
  .option("header", True) 
  .option("delimiter", ',') 
  .load("<Load_path>") 
  .writeStream 
  .format("parquet") 
  .outputMode("append") 
  .trigger(processingTime='10 seconds') 
  .option("path","<Hdfs_path>") 
  .option("checkpointLocation","<Checkpoint Loc>") 
  .start()

Load path is like file:////home/pardeep/file2 where file2 is a directory name (not a file).

It is running fine in start but after adding more CSV file in source folder, it is giving below error:

Caused by: java.io.FileNotFoundException: File file:<file>.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)

Error is not coming always after adding first file, sometime it is after first file addition, sometime it is after second file.

There is another job that is moving file in this folder. Job is writing in temp folder and moving into this folder.

At start, there are some file present in the directory, but files are coming to directory continuously (after every 2-3 minutes). I am not sure how to do refresh (what is table name?) and when to do it because it is a streaming job.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...