I am pretty new to Spark so I might use some help from you.
I have a couple of JSONL files like these in an S3 bucket, each one of them has a field that contains the name of its file (more on that later):
{"id": "abc-123-abc", "first_name": "John", "last_name": "Simonis", "s3_original_file": "s3://bucket/prefix/file1.jsonl"}
{"id": "def-563-abc", "first_name": "Mary", "last_name": "Culkin", "s3_original_file": "s3://bucket/prefix/file1.jsonl"}
{"id": "abc-532-def", "first_name": "James", "s3_original_file": "s3://bucket/prefix/file2.jsonl"}
{"id": "abc-445-abc", "first_name": "Fiona", "last_name": "Goodwill", "s3_original_file": "s3://bucket/prefix/file3.jsonl"}
{"id": "abc-167-def", "last_name": "Matz", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "ghj-134-abc", "first_name": "Adam", "last_name": "Gleason", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "abc-523-abc", "first_name": "Phil", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "ghj-823-abc", "first_name": "Jack", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file5.jsonl"}
{"id": "abc-128-abc", "first_name": "Mary", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"id": "abc-124-ghj", "last_name": "Foster", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"id": "ghj-133-abc", "first_name": "Julius", "last_name": "Bull", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"id": "abc-723-abc", "first_name": "Gareth", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file7.jsonl"}
Since I read all the bucket files in a single dataframe I lose the relationship between the records and the original file, so the needing for that field.
df = spark.read.json('s3://bucket/prefix')
I do some transformations with an and I obtain an updated dataframe with some fields added in some of them
df2 = df.rdd.map(lambda x: my_transformations(x))
{"additional_field": "blabla", "id": "abc-123-abc", "first_name": "John", "last_name": "Simonis", "s3_original_file": "s3://bucket/prefix/file1.jsonl"}
{"additional_field": "blabla", "id": "def-563-abc", "first_name": "Mary", "last_name": "Culkin", "s3_original_file": "s3://bucket/prefix/file1.jsonl"}
{"additional_field": "blabla", "id": "abc-532-def", "first_name": "James", "s3_original_file": "s3://bucket/prefix/file2.jsonl"}
{"id": "abc-445-abc", "first_name": "Fiona", "last_name": "Goodwill", "s3_original_file": "s3://bucket/prefix/file3.jsonl"}
{"additional_field": "blabla", "id": "abc-167-def", "last_name": "Matz", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"additional_field": "blabla", "id": "ghj-134-abc", "first_name": "Adam", "last_name": "Gleason", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "abc-523-abc", "first_name": "Phil", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"additional_field": "blabla", "id": "ghj-823-abc", "first_name": "Jack", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file5.jsonl"}
{"id": "abc-128-abc", "first_name": "Mary", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"additional_field": "blabla", "id": "abc-124-ghj", "last_name": "Foster", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"additional_field": "blabla", "id": "ghj-133-abc", "first_name": "Julius", "last_name": "Bull", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"additional_field": "blabla", "id": "abc-723-abc", "first_name": "Gareth", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file7.jsonl"}
I then need to regroup them by "s3_original_file"
grouped = df2.groupBy(df2.s3_original_file)
And since I need to rewrite the files keeping the original association (I can't use df.write.json because I lose that association, I'll do that with df.foreach()
and boto3 inside the lambda) I accumulate the aggregate data passing the column names except for the grouping column
def fetch_columns(dataframe, grouping):
output = []
columns = dataframe.columns
for column in columns:
if column != grouping:
output.append(F.collect_list(column).alias(column))
return output
resultDF = grouped.agg(*fetch_columns(df2, 's3_original_file'))
Then I need to save the resulting dataframe Rows as json lines in the specific file which I might do inside the save_back_to_s3 function.
resultDF.foreach(lambda x: save_back_to_s3(x))
the problem is:
I get an aggregated list of values per column, instead, I would like to have a single column with the list of the rows grouped and nothing takes care of eventual null values, messing up the ordering. I want to preserve eventually null columns to be aware that data does not exist.
>>> resultDF.show(20, False)
+------------------------------+----------------+--------------+---------------------------------------+----------------------+
|s3_original_file |additional_field|first_name |id |last_name |
+------------------------------+----------------+--------------+---------------------------------------+----------------------+
|s3://bucket/prefix/file3.jsonl|[] |[Fiona] |[abc-445-abc] |[Goodwill] |
|s3://bucket/prefix/file7.jsonl|[blabla] |[Gareth] |[abc-723-abc] |[Smith] |
|s3://bucket/prefix/file5.jsonl|[blabla] |[Jack] |[ghj-823-abc] |[Smith] |
|s3://bucket/prefix/file4.jsonl|[blabla, blabla]|[Adam, Phil] |[abc-167-def, ghj-134-abc, abc-523-abc]|[Matz, Gleason, Smith]|
|s3://bucket/prefix/file6.jsonl|[blabla, blabla]|[Mary, Julius]|[abc-128-abc, abc-124-ghj, ghj-133-abc]|[Foster, Bull] |
|s3://bucket/prefix/file1.jsonl|[blabla, blabla]|[John, Mary] |[abc-123-abc, def-563-abc] |[Simonis, Culkin] |
|s3://bucket/prefix/file2.jsonl|[blabla] |[James] |[abc-532-def] |[] |
+------------------------------+----------------+--------------+---------------------------------------+----------------------+
Is it possible to have a resulting dataframe like this one instead?
+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|s3_original_file |records |
+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|s3://bucket/prefix/file3.jsonl|{"id": "abc-445-abc", "first_name": "Fiona", "last_name": "Goodwill", "s3_original_file": "s3://bucket/prefix/file3.jsonl"} |
|s3://bucket/prefix/file7.jsonl|{"additional_field": "blabla", "id": "abc-723-abc", "first_name": "Gareth", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file7.jsonl"} |
|s3://bucket/prefix/file5.jsonl|{"additional_field": "blabla", "id": "ghj-823-abc", "first_name": "Jack", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file5.jsonl"} |
|s3://bucket/prefix/file4.jsonl|{"additional_field": "blabla", "id": "ghj-134-abc", "first_name": "Adam", "last_name": "Gleason", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "abc-523-abc", "first_name": "Phil", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file4.jsonl"} |
|s3://bucket/prefix/file6.jsonl|{"id": "abc-128-abc", "first_name": "Mary", "