Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
217 views
in Technique[技术] by (71.8m points)

python - PySpark and broadcast join example

I am using Spark 1.3

# Read from text file, parse it and then do some basic filtering to get   data1
data1.registerTempTable('data1')

# Read from text file, parse it and then do some basic filtering to get data1
data2.registerTempTable('data2')

# Perform join
data_joined = data1.join(data2, data1.id == data2.id);

My data is quite skewed and data2 (few KB) << data1 (10s of GB) and the performance is quite bad. I was reading about broadcast join, but not sure how I can do the same using Python API.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Spark 1.3 doesn't support broadcast joins using DataFrame. In Spark >= 1.5.0 you can use broadcast function to apply broadcast joins:

from pyspark.sql.functions import broadcast

data1.join(broadcast(data2), data1.id == data2.id)

For older versions the only option is to convert to RDD and apply the same logic as in other languages. Roughly something like this:

from pyspark.sql import Row
from pyspark.sql.types import StructType

# Create a dictionary where keys are join keys
# and values are lists of rows
data2_bd = sc.broadcast(
    data2.map(lambda r: (r.id, r)).groupByKey().collectAsMap())


# Define a new row with fields from both DFs
output_row = Row(*data1.columns + data2.columns)

# And an output schema
output_schema = StructType(data1.schema.fields + data2.schema.fields)

# Given row x, extract a list of corresponding rows from broadcast
# and output a list of merged rows
def gen_rows(x):
    return [output_row(*x + y) for y in data2_bd.value.get(x.id, [])]

# flatMap and create a new data frame
joined = data1.rdd.flatMap(lambda row: gen_rows(row)).toDF(output_schema)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...