Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
231 views
in Technique[技术] by (71.8m points)

python - How to perform self join with same row of previous group(month) to bring in additional columns with different expressions in Pyspark

Had error in calculating the following new columns from value1_1 to value4_4 based on the formula given.

Input:

Month_no|value1 |value2 |value3 |value4|
  01    |10     |20     |30     |40    |
  01    |20     |30     |40     |50    |
  01    |30     |40     |50     |60    |
  02    |40     |50     |60     |70    |
  02    |50     |60     |70     |80    |
  02    |60     |70     |80     |90    |
  03    |70     |80     |90     |100   |
  03    |80     |90     |100    |110   |
  03    |90     |100    |110    |120   |

The value1_1 and value2_2 should calculate based on exp: value1 + prev. month's value1 . For example, for month_no 02, the value1_1 for the first row should be month_no 01' first row value1 (10) + month_no 02's first row value 1 (40) = 50

The value3_3 and value4_4 should calculate based on exp: (value3 + value3 of prev month)/ (qrt mnth no#)

qtr month no#: the month number within each quarter.

If Jan no# is 1
If Feb no# is 2
If Mar no# is 3
If Apr no# is 1
If May no# is 2
If Jun no# is 3

Output: value1_1 and 2_2 is calculated as per one formula and value3_3 and 4_4 is calculated with another formula.

Month_no|value1 |value2 |value3 |value4 |value1_1|value2_2|value3_3   |value4_4   |
01      |10     |20     |30     |40     |10      |20      |30         |40         |
01      |20     |30     |40     |50     |20      |30      |40         |50         |
01      |30     |40     |50     |60     |30      |40      |50         |60         |
02      |40     |50     |60     |70     |50      |70      |45         |55         |
02      |50     |60     |70     |80     |70      |90      |55         |65         |
02      |60     |70     |80     |90     |90      |110     |65         |75         |
03      |70     |80     |90     |100    |120     |150     |45         |51.66666667|
03      |80     |90     |100    |110    |150     |180     |51.66666667|58.33333333|
03      |90     |100    |110    |120    |180     |210     |58.33333333|65         |

I was trying to do for loop on each month with current and previous month by joining and calculating the new values. But for loop comes into performance issue for million no# of records. Any suggestion to resolve as of another approach??

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Your question is unclear. However, based on data, I will try to answer it.

Based on your source data, within each month, the data looks like it is sorted by something. I will take value_1 as sorting column. You can change it to something else based on your logic. Based on this sorting column, I will generate row_number and use it in self join.

You can try something like below to achieve your results. The following code is giving proper results in spark 2.x. You may have to tweak it to work in your spark env. Please note that your formula and your result set does not match for Month_no 3.

from pyspark.sql import Window
from pyspark.sql.functions import row_number,lit,col,when

#storing your source data and forming it as a list of list
data=""" 01    |10     |20     |30     |40    
  01    |20     |30     |40     |50     
  01    |30     |40     |50     |60     
  02    |40     |50     |60     |70     
  02    |50     |60     |70     |80     
  02    |60     |70     |80     |90     
  03    |70     |80     |90     |100    
  03    |80     |90     |100    |110    
  03    |90     |100    |110    |120    """

data01=data.split('
')
data02=[ item.split('|') for item in data01 ]

#creating variables with column names for convenience
month_no='Month_no';value1='value1';value2='value2';value3='value3';value4='value4';crownum="rownum";qtrMonthNo="qtrMonthNo";

#creating rdd & df based on your data
df=sc.parallelize(data02).toDF(['Month_no','value1','value2','value3','value4'])
sourcedata=df.selectExpr("cast(trim(month_no) as integer) as Month_no","cast(trim(value1) as integer) as value1","cast(trim(value2) as integer) as value2","cast(trim(value3) as integer) as value3","cast(trim(value4) as integer) as value4")

#Adding rownum to join with appropriate row in same month
rownum_window=Window.partitionBy(month_no).orderBy(value1)
df1=sourcedata.withColumn("rownum",row_number().over(rownum_window))

#preparing dataframes for join
df_left=df1 
df_right=df1.select(*[col(colm).alias("r_"+colm)  for colm in df1.columns ])

#joining dataframes
df_joined=df_left.join(df_right,( df_left.Month_no - 1 == df_right.r_Month_no )  & ( df_left.rownum==df_right.r_rownum )  ,"left").fillna(0)
df_joined=df_joined.withColumn(qtrMonthNo,when(df_joined.Month_no % 3 == 0, 3).otherwise(df_joined.Month_no % 3))
#not required
df_joined.cache()

#calculating value1_1 & value2_2
first_cal=df_joined.select((col("r_value1")+col("value1")).alias("value1_1"),(col("r_value2")+col("value2")).alias("value2_2"),qtrMonthNo,"r_value3","r_value4",*df1.columns)

#calculating value3_3 & value4_4
second_cal=first_cal.select(((col("r_value3")+col("value3")) / col("qtrMonthNo") ).alias("value3_3"),((col("r_value4")+col("value4")) / col("qtrMonthNo") ).alias("value4_4"),*first_cal.columns)

#final dataframe with necessary columns and sorted data
result_df=second_cal.orderBy(month_no,value1).drop(qtrMonthNo,crownum,"r_value3","r_value4")
result_df.show()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...