Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
122 views
in Technique[技术] by (71.8m points)

python - Apache Spark Aggregate JSONL DataFrames Grouped By keeping null values

I am pretty new to Spark so I might use some help from you.

I have a couple of JSONL files like these in an S3 bucket, each one of them has a field that contains the name of its file (more on that later):

{"id": "abc-123-abc", "first_name": "John", "last_name": "Simonis", "s3_original_file": "s3://bucket/prefix/file1.jsonl"}
{"id": "def-563-abc", "first_name": "Mary", "last_name": "Culkin", "s3_original_file": "s3://bucket/prefix/file1.jsonl"}
{"id": "abc-532-def", "first_name": "James", "s3_original_file": "s3://bucket/prefix/file2.jsonl"}
{"id": "abc-445-abc", "first_name": "Fiona", "last_name": "Goodwill", "s3_original_file": "s3://bucket/prefix/file3.jsonl"}
{"id": "abc-167-def", "last_name": "Matz", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "ghj-134-abc", "first_name": "Adam", "last_name": "Gleason", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "abc-523-abc", "first_name": "Phil", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "ghj-823-abc", "first_name": "Jack", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file5.jsonl"}
{"id": "abc-128-abc", "first_name": "Mary", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"id": "abc-124-ghj", "last_name": "Foster", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"id": "ghj-133-abc", "first_name": "Julius", "last_name": "Bull", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"id": "abc-723-abc", "first_name": "Gareth", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file7.jsonl"}

Since I read all the bucket files in a single dataframe I lose the relationship between the records and the original file, so the needing for that field.

df = spark.read.json('s3://bucket/prefix')

I do some transformations with an and I obtain an updated dataframe with some fields added in some of them

df2 = df.rdd.map(lambda x: my_transformations(x))

{"additional_field": "blabla", "id": "abc-123-abc", "first_name": "John", "last_name": "Simonis", "s3_original_file": "s3://bucket/prefix/file1.jsonl"}
{"additional_field": "blabla", "id": "def-563-abc", "first_name": "Mary", "last_name": "Culkin", "s3_original_file": "s3://bucket/prefix/file1.jsonl"}
{"additional_field": "blabla", "id": "abc-532-def", "first_name": "James", "s3_original_file": "s3://bucket/prefix/file2.jsonl"}
{"id": "abc-445-abc", "first_name": "Fiona", "last_name": "Goodwill", "s3_original_file": "s3://bucket/prefix/file3.jsonl"}
{"additional_field": "blabla", "id": "abc-167-def", "last_name": "Matz", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"additional_field": "blabla", "id": "ghj-134-abc", "first_name": "Adam", "last_name": "Gleason", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "abc-523-abc", "first_name": "Phil", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"additional_field": "blabla", "id": "ghj-823-abc", "first_name": "Jack", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file5.jsonl"}
{"id": "abc-128-abc", "first_name": "Mary", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"additional_field": "blabla", "id": "abc-124-ghj", "last_name": "Foster", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"additional_field": "blabla", "id": "ghj-133-abc", "first_name": "Julius", "last_name": "Bull", "s3_original_file": "s3://bucket/prefix/file6.jsonl"}
{"additional_field": "blabla", "id": "abc-723-abc", "first_name": "Gareth", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file7.jsonl"}

I then need to regroup them by "s3_original_file"

grouped = df2.groupBy(df2.s3_original_file)

And since I need to rewrite the files keeping the original association (I can't use df.write.json because I lose that association, I'll do that with df.foreach() and boto3 inside the lambda) I accumulate the aggregate data passing the column names except for the grouping column

def fetch_columns(dataframe, grouping):
    output = []
    columns = dataframe.columns
    for column in columns:
        if column != grouping:
            output.append(F.collect_list(column).alias(column))
    return output

resultDF = grouped.agg(*fetch_columns(df2, 's3_original_file'))

Then I need to save the resulting dataframe Rows as json lines in the specific file which I might do inside the save_back_to_s3 function.

resultDF.foreach(lambda x: save_back_to_s3(x))

the problem is: I get an aggregated list of values per column, instead, I would like to have a single column with the list of the rows grouped and nothing takes care of eventual null values, messing up the ordering. I want to preserve eventually null columns to be aware that data does not exist.

>>> resultDF.show(20, False)
+------------------------------+----------------+--------------+---------------------------------------+----------------------+
|s3_original_file              |additional_field|first_name    |id                                     |last_name             |
+------------------------------+----------------+--------------+---------------------------------------+----------------------+
|s3://bucket/prefix/file3.jsonl|[]              |[Fiona]       |[abc-445-abc]                          |[Goodwill]            |
|s3://bucket/prefix/file7.jsonl|[blabla]        |[Gareth]      |[abc-723-abc]                          |[Smith]               |
|s3://bucket/prefix/file5.jsonl|[blabla]        |[Jack]        |[ghj-823-abc]                          |[Smith]               |
|s3://bucket/prefix/file4.jsonl|[blabla, blabla]|[Adam, Phil]  |[abc-167-def, ghj-134-abc, abc-523-abc]|[Matz, Gleason, Smith]|
|s3://bucket/prefix/file6.jsonl|[blabla, blabla]|[Mary, Julius]|[abc-128-abc, abc-124-ghj, ghj-133-abc]|[Foster, Bull]        |
|s3://bucket/prefix/file1.jsonl|[blabla, blabla]|[John, Mary]  |[abc-123-abc, def-563-abc]             |[Simonis, Culkin]     |
|s3://bucket/prefix/file2.jsonl|[blabla]        |[James]       |[abc-532-def]                          |[]                    |
+------------------------------+----------------+--------------+---------------------------------------+----------------------+

Is it possible to have a resulting dataframe like this one instead?

+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|s3_original_file              |records                                                                                                                                                                                                                                                                                                                                                                                    |
+------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|s3://bucket/prefix/file3.jsonl|{"id": "abc-445-abc", "first_name": "Fiona", "last_name": "Goodwill", "s3_original_file": "s3://bucket/prefix/file3.jsonl"}                                                                                                                                                                                                                                                                |
|s3://bucket/prefix/file7.jsonl|{"additional_field": "blabla", "id": "abc-723-abc", "first_name": "Gareth", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file7.jsonl"}                                                                                                                                                                                                                                    |
|s3://bucket/prefix/file5.jsonl|{"additional_field": "blabla", "id": "ghj-823-abc", "first_name": "Jack", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file5.jsonl"}                                                                                                                                                                                                                                      |
|s3://bucket/prefix/file4.jsonl|{"additional_field": "blabla", "id": "ghj-134-abc", "first_name": "Adam", "last_name": "Gleason", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}
{"id": "abc-523-abc", "first_name": "Phil", "last_name": "Smith", "s3_original_file": "s3://bucket/prefix/file4.jsonl"}                                                                                                           |
|s3://bucket/prefix/file6.jsonl|{"id": "abc-128-abc", "first_name": "Mary", "

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can create a struct column composed of any columns you wish to include and then use the to_json function to transform it to a single JSON string for export:

scala> val df = Seq((1, "a", Seq("a", "b", "c")), (2, "b", Seq("d", "e", "f"))).toDF("x", "y", "z")
df: org.apache.spark.sql.DataFrame = [x: int, y: string ... 1 more field]

scala> val df_json = df.select(to_json(struct($"x", $"y", $"z")).as("json_field"))
df_json: org.apache.spark.sql.DataFrame = [json_field: string]

scala> df_json.show(false)
+---------------------------------+
|json_field                       |
+---------------------------------+
|{"x":1,"y":"a","z":["a","b","c"]}|
|{"x":2,"y":"b","z":["d","e","f"]}|
+---------------------------------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...