Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
868 views
in Technique[技术] by (71.8m points)

python - How to delete a particular month from a parquet file partitioned by month

I am having monthly Revenue data for the last 5 years and I am storing the DataFrames for respective months in parquet formats in append mode, but partitioned by month column. Here is the pseudo-code below -

def Revenue(filename):
    df = spark.read.load(filename)
    .
    .
    df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')

Revenue('Revenue_201501.csv')
Revenue('Revenue_201502.csv')
Revenue('Revenue_201503.csv')
Revenue('Revenue_201504.csv')
Revenue('Revenue_201505.csv')

The df gets stored in parquet format on monthly basis, as can be seen below -

enter image description here

Question: How can I delete the parquet folder corresponding to a particular month?

One way would be to load all these parquet files in a big df and then use .where() clause to filter out that particular month and then save it back into parquet format partitionBy month in overwrite mode, like this -

# If we want to remove data from Feb, 2015
df = spark.read.format('parquet').load('Revenue.parquet')
df = df.where(col('month') != lit('2015-02-01'))
df.write.format('parquet').mode('overwrite').partitionBy('month').save('/path/Revenue')

But, this approach is quite cumbersome.

Other way is to directly delete the folder of that particular month, but I am not sure if that's a right way to approach things, lest we alter the metadata in an unforseeable way.

What would be the right way to delete the parquet data for a particular month?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Spark supports deleting partition, both data and metadata.
Quoting the scala code comment

/**
 * Drop Partition in ALTER TABLE: to drop a particular partition for a table.
 *
 * This removes the data and metadata for this partition.
 * The data is actually moved to the .Trash/Current directory if Trash is configured,
 * unless 'purge' is true, but the metadata is completely lost.
 * An error message will be issued if the partition does not exist, unless 'ifExists' is true.
 * Note: purge is always false when the target is a view.
 *
 * The syntax of this command is:
 * {{{
 *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
 * }}}
 */

In your case, there is no backing table. We could register the dataframe as a temp table and use the above syntax(temp table documentation)

From pyspark, we could run the SQL using the syntax in this link Sample:

df = spark.read.format('parquet').load('Revenue.parquet'). registerTempTable("tmp")
spark.sql("ALTER TABLE tmp DROP IF EXISTS PARTITION (month='2015-02-01') PURGE")

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...