I am trying to run spark structured streaming job that reads CSV files from local and load to HDFS in parquet format.
I start a pyspark job as follows:
pyspark2 --master yarn --executor-memory 8G --driver-memory 8G
The code looks as follows:
from pyspark.sql.types import StructType
sch = StructType(...)
spark.readStream
.format("csv")
.schema(sch)
.option("header", True)
.option("delimiter", ',')
.load("<Load_path>")
.writeStream
.format("parquet")
.outputMode("append")
.trigger(processingTime='10 seconds')
.option("path","<Hdfs_path>")
.option("checkpointLocation","<Checkpoint Loc>")
.start()
Load path is like file:////home/pardeep/file2
where file2
is a directory name (not a file).
It is running fine in start but after adding more CSV file in source folder, it is giving below error:
Caused by: java.io.FileNotFoundException: File file:<file>.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
Error is not coming always after adding first file, sometime it is after first file addition, sometime it is after second file.
There is another job that is moving file in this folder. Job is writing in temp folder and moving into this folder.
At start, there are some file present in the directory, but files are coming to directory continuously (after every 2-3 minutes). I am not sure how to do refresh (what is table name?) and when to do it because it is a streaming job.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…