Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
303 views
in Technique[技术] by (71.8m points)

apache spark - Generate multiple outputs from a dataframe, reading data only once

I have an input of multiple files and I need to apply different processing rules and output to multiple files.

How can do it so that the files are only read once, but in a way that I can apply different filtering, grouping, etc and save to different output files?

Something similar to the diagram below:

          INPUT
          FILES
            |
            |
           / 
          /   
         /     
    FILTER X  FILTER Y
        |        |
        |        |
  GROUP BY A  GROUP BY B
        |        |
        |        |
     OUTPUT    OUTPUT
     FILE 1    FILE 2

I tried to use a code similar as below, but it seems to be reading the input files multiple times.

rd = spark.read.format('dbf').load(os.path.join(
    sih_data,
    'RD??{10,11,12,13,14,15,16,17,18,19,20,21}??.{DBC,dbc}'
))


out1 = rd.where(rd['UF_ZI'] == '52')
out1 = out1.groupBy('UF_ZI').count()
out1.write.format("com.databricks.spark.csv")
    .option("header", "true").save("out1")

out2 = rd.where(rd['IDENT'] == '1')
out2 = out2.groupBy('MUNIC_RES').count()
out2.write.format("com.databricks.spark.csv")
    .option("header", "true").save("out2")
question from:https://stackoverflow.com/questions/65861768/generate-multiple-outputs-from-a-dataframe-reading-data-only-once

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

To answer your question specifically, as mentioned in the comments, you can use .cache() once you have read the data and use your exact same code. cache() is just a shortcut for persist('MEMORY_AND_DISK') so your data will be stored in memory if possible, and the rest on the disk.

That being said, it is worth thinking about what you are doing before deciding anything. I don't know about the dbf format but if it allows predicate pushdown and that your filters are quite restrictive, it could be better to leave your code as it is to load less data (predicate pushdown on cached data is less effective in spark). If the source does not allow predicate pushdown or if you have many filters, other approaches could be interesting.

For instance, you could wrap all your filters into one job to 1. avoid the overhead of each job 2. read the data only once.

# you could define as many filters as a list of 3-tuples.
# the 1st element is the id of the filter, the second the filter itself
# and the third, the grouping column.
filters = [(1, rd['UF_ZI'] == '52', 'UF_ZI'), (2, rd['IDENT'] == '1', 'MUNIC_RES')]

# Then you define an array of structs. If the filter passes, the element is
# a struct with the grouping column and the filter. If not, it is null.
cols = [F.when(filter,
            F.struct(F.col(group).alias("group"),
                     F.lit(id).alias("filter")))
        for (id, filter, group) in filters]

# Finally, we explode the array of filters defined just before and filter out 
# null values that correspond to non matching filters.
rd
  .withColumn("s", F.explode(F.array(*cols)))
  .where(F.col('s').isNotNull())
  .groupBy(F.col("s.group").alias("group"), F.col("s.filter").alias("filter"))
  .count()
  .write.partitionBy("filter").csv("test.csv")

# In spark>=2.4, you could avoid the filter and use array_except to remove null 
# values before exploding the array. It might be more efficient.
rd
  .withColumn("s", F.explode(F.array_except(
                                 F.array(*cols),
                                 F.array(F.lit(None))
  )))
  .groupBy(F.col("s.group").alias("group"), F.col("s.filter").alias("filter"))
  .count()
  .write.partitionBy("filter").csv("test.csv")

Having a look at test.csv, it looks like this:

> ls test.csv
'filter=1'  'filter=2'

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...