I have a set of files. The path to the files are saved in a file., say all_files.txt
. Using apache spark, I need to do an operation on all the files and club the results.
The steps that I want to do are:
- Create an RDD by reading
all_files.txt
- For each line in
all_files.txt
(Each line is a path to some file),
read the contents of each of the files into a single RDD
- Then do an operation all contents
This is the code I wrote for the same:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession
.builder
.appName("PythonWordCount")
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
This is throwing the error:
line 323, in get_return_value py4j.protocol.Py4JError: An error
occurred while calling o25.getnewargs. Trace: py4j.Py4JException:
Method getnewargs([]) does not exist at
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.GatewayConnection.run(GatewayConnection.java:214) at
java.lang.Thread.run(Thread.java:745)
Can someone please tell me what I am doing wrong and how I should proceed further. Thanks in advance.
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…