I want to create a data processing pipeline in AWS to eventually use the processed data for Machine Learning.
I have a Scala script that takes raw data from S3, processes it and writes it to HDFS or even S3 with Spark-CSV. I think I can use multiple files as input if I want to use AWS Machine Learning tool for training a prediction model. But if I want to use something else, I presume it is best if I receive a single CSV output file.
Currently, as I do not want to use repartition(1) nor coalesce(1) for performance purposes, I have used hadoop fs -getmerge for manual testing, but as it just merges the contents of the job output files, I am running into a small problem. I need a single row of headers in the data file for training the prediction model.
If I use .option("header","true")
for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.
Now I found an option to merge the files inside the Scala script with Hadoop API FileUtil.copyMerge
. I tried this in spark-shell
with the code below.
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
But this solution still just concatenates the files on top of each other and does not handle headers. How can I get an output file with only one row of headers?
I even tried adding df.columns.mkString(",")
as the last argument for copyMerge
, but this added the headers still multiple times, not once.
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…