Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.4k views
in Technique[技术] by (71.8m points)

apache spark - pyspark: count distinct over a window

I just tried doing a countDistinct over a window and got this error:

AnalysisException: u'Distinct window functions are not supported: count(distinct color#1926)

Is there a way to do a distinct count over a window in pyspark?

Here's some example code:

from pyspark.sql.window import Window    
from pyspark.sql import functions as F

#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                    (13, "2017-03-15T12:27:18+00:00", "red"),
                    (25, "2017-03-18T11:27:18+00:00", "red")],
                    ["dollars", "timestampGMT", "color"])
                    
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w))

df.show()

This is the output I'd like to see:

+-------+--------------------+------+---------------------------------------+
|dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
|     17|2017-03-10 15:27:...|orange|                                      1|
|     13|2017-03-15 12:27:...|   red|                                      2|
|     25|2017-03-18 11:27:...|   red|                                      1|
+-------+--------------------+------+---------------------------------------+
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

EDIT:

As noleto mentions in his answer below, there is now an approx_count_distinct function since pyspark 2.1 that works over a window.


Original Answer

I figured out that I can use a combination of the collect_set and size functions to mimic the functionality of countDistinct over a window:

from pyspark.sql.window import Window
from pyspark.sql import functions as F

#function to calculate number of seconds from number of days
days = lambda i: i * 86400

#create some test data
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                    (13, "2017-03-15T12:27:18+00:00", "red"),
                    (25, "2017-03-18T11:27:18+00:00", "red")],
                    ["dollars", "timestampGMT", "color"])

#convert string timestamp to timestamp type             
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

#use collect_set and size functions to perform countDistinct over a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))

df.show()

This results in the distinct count of color over the previous week of records:

+-------+--------------------+------+---------------------------------------+
|dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
|     17|2017-03-10 15:27:...|orange|                                      1|
|     13|2017-03-15 12:27:...|   red|                                      2|
|     25|2017-03-18 11:27:...|   red|                                      1|
+-------+--------------------+------+---------------------------------------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...