Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.3k views
in Technique[技术] by (71.8m points)

scala - Spark : Read file only if the path exists

I am trying to read the files present at Sequence of Paths in scala. Below is the sample (pseudo) code:

val paths = Seq[String] //Seq of paths
val dataframe = spark.read.parquet(paths: _*)

Now, in the above sequence, some paths exist whereas some don't. Is there any way to ignore the missing paths while reading parquet files (to avoid org.apache.spark.sql.AnalysisException: Path does not exist)?

I have tried the below and it seems to be working, but then, I end up reading the same path twice which is something I would like to avoid doing:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)

I checked the options method for DataFrameReader but that does not seem to have any option that is similar to ignore_if_missing.

Also, these paths can be hdfs or s3 (this Seq is passed as a method argument) and while reading, I don't know whether a path is s3 or hdfs so can't use s3 or hdfs specific API to check the existence.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can filter out the irrelevant files as in @Psidom's answer. In spark, the best way to do so is to use the internal spark hadoop configuration. Given that spark session variable is called "spark" you can do:

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def testDirExist(path: String): Boolean = {
  val p = new Path(path)
  hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory
}
val filteredPaths = paths.filter(p => testDirExists(p))
val dataframe = spark.read.parquet(filteredPaths: _*)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...