I have a dataset consisting of a timestamp column and a dollars column. I would like to find the average number of dollars per week ending at the timestamp of each row. I was initially looking at the pyspark.sql.functions.window function, but that bins the data by week.
Here's an example:
%pyspark
import datetime
from pyspark.sql import functions as F
df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))
w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()
This results in two records:
| start | end | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|
The window function binned the time series data rather than performing a rolling average.
Is there a way to perform a rolling average where I'll get back a weekly average for each row with a time period ending at the timestampGMT of the row?
EDIT:
Zhang's answer below is close to what I want, but not exactly what I'd like to see.
Here's a better example to show what I'm trying to get at:
%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-15T12:27:18+00:00"),
(25, "2017-03-18T11:27:18+00:00")],
["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))
This results in the following dataframe:
dollars timestampGMT rolling_average
25 2017-03-18 11:27:18.0 25
17 2017-03-10 15:27:18.0 15
13 2017-03-15 12:27:18.0 15
I'd like the average to be over the week proceeding the date in the timestampGMT column, which would result in this:
dollars timestampGMT rolling_average
17 2017-03-10 15:27:18.0 17
13 2017-03-15 12:27:18.0 15
25 2017-03-18 11:27:18.0 19
In the above results, the rolling_average for 2017-03-10 is 17, since there are no preceding records. The rolling_average for 2017-03-15 is 15 because it is averaging the 13 from 2017-03-15 and the 17 from 2017-03-10 which falls withing the preceding 7 day window. The rolling average for 2017-03-18 is 19 because it is averaging the 25 from 2017-03-18 and the 13 from 2017-03-10 which falls withing the preceding 7 day window, and it is not including the 17 from 2017-03-10 because that does not fall withing the preceding 7 day window.
Is there a way to do this rather than the binning window where the weekly windows don't overlap?
question from:
https://stackoverflow.com/questions/45806194/pyspark-rolling-average-using-timeseries-data