To answer your question specifically, as mentioned in the comments, you can use .cache()
once you have read the data and use your exact same code. cache()
is just a shortcut for persist('MEMORY_AND_DISK')
so your data will be stored in memory if possible, and the rest on the disk.
That being said, it is worth thinking about what you are doing before deciding anything. I don't know about the dbf
format but if it allows predicate pushdown and that your filters are quite restrictive, it could be better to leave your code as it is to load less data (predicate pushdown on cached data is less effective in spark). If the source does not allow predicate pushdown or if you have many filters, other approaches could be interesting.
For instance, you could wrap all your filters into one job to 1. avoid the overhead of each job 2. read the data only once.
# you could define as many filters as a list of 3-tuples.
# the 1st element is the id of the filter, the second the filter itself
# and the third, the grouping column.
filters = [(1, rd['UF_ZI'] == '52', 'UF_ZI'), (2, rd['IDENT'] == '1', 'MUNIC_RES')]
# Then you define an array of structs. If the filter passes, the element is
# a struct with the grouping column and the filter. If not, it is null.
cols = [F.when(filter,
F.struct(F.col(group).alias("group"),
F.lit(id).alias("filter")))
for (id, filter, group) in filters]
# Finally, we explode the array of filters defined just before and filter out
# null values that correspond to non matching filters.
rd
.withColumn("s", F.explode(F.array(*cols)))
.where(F.col('s').isNotNull())
.groupBy(F.col("s.group").alias("group"), F.col("s.filter").alias("filter"))
.count()
.write.partitionBy("filter").csv("test.csv")
# In spark>=2.4, you could avoid the filter and use array_except to remove null
# values before exploding the array. It might be more efficient.
rd
.withColumn("s", F.explode(F.array_except(
F.array(*cols),
F.array(F.lit(None))
)))
.groupBy(F.col("s.group").alias("group"), F.col("s.filter").alias("filter"))
.count()
.write.partitionBy("filter").csv("test.csv")
Having a look at test.csv
, it looks like this:
> ls test.csv
'filter=1' 'filter=2'
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…