In spark we can't control name of the file
written to the directory.
First write the data to the HDFS directory
then For changing the name of file we need to use HDFS api
.
Example:
In Pyspark:
l=[("a",1)]
ll=["id","sa"]
df=spark.createDataFrame(l,ll)
hdfs_dir = "/folder/" #your hdfs directory
new_filename="my_name.json" #new filename
df.coalesce(1).write.format("json").mode("overwrite").save(hdfs_dir)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
#list files in the directory
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dir))
#filter name of the file starts with part-
file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]
#rename the file
fs.rename(Path(hdfs_dir+''+file_name),Path(hdfs_dir+''+new_filename))
In case if you want to delete success files
in the directory use fs.delete
to delete _Success
files.
In Scala:
val df=Seq(("a",1)).toDF("id","sa")
df.show(false)
import org.apache.hadoop.fs._
val hdfs_dir = "/folder/"
val new_filename="new_json.json"
df.coalesce(1).write.mode("overwrite").format("json").save(hdfs_dir)
val fs=FileSystem.get(sc.hadoopConfiguration)
val f=fs.globStatus(new Path(s"${hdfs_dir}" + "*")).filter(x => x.getPath.getName.toString.startsWith("part-")).map(x => x.getPath.getName).mkString
fs.rename(new Path(s"${hdfs_dir}${f}"),new Path(s"${hdfs_dir}${new_filename}"))
fs.delete(new Path(s"${hdfs_dir}" + "_SUCCESS"))
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…