Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
152 views
in Technique[技术] by (71.8m points)

python - pyspark dataframe doesn't change after some processing

I create a dateframe, and use window function to get the accumulative value, but after use the function,df and df.select() show with different row order

spark = SparkSession.builder.master('local[8]').appName("SparkByExample.com").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", '50')

# create dataframe
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,FloatType,LongType,ArrayType
data = [(1,1,2),
        (1,2,2),
        (1,3,2),   
        (1,2,1),
        (2,1,2),
        (2,3,2),
        (2,2,1),
        (3,1,2),
        (3,3,2),
        (3,2,1)]

schema = StructType([ 
    StructField("col1", IntegerType(),True), 
    StructField("col2", IntegerType(),True), 
    StructField("col3", IntegerType(),True)])
 
df = spark.createDataFrame(data=data,schema=schema)
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   1|   2|
|   1|   2|   2|
|   1|   3|   2|
|   1|   2|   1|
|   2|   1|   2|
|   2|   3|   2|
|   2|   2|   1|
|   3|   1|   2|
|   3|   3|   2|
|   3|   2|   1|
+----+----+----+
# this is window fuction to get the accumulative value
from pyspark.sql.window import Window
w = Window().partitionBy(F.col('col1')).orderBy(F.col('col2'))

def f(lag_val, current_val):
    value = 0
    if lag_val != current_val:
        value = 1
    return value
    
# register udf so we can use with our dataframe
func_udf = F.udf(f, IntegerType())

print(id(df))
df = df.withColumn("new_column", func_udf(F.lag("col3").over(w), df['col3'])).withColumn('new_column2', F.sum('new_column').over(w.partitionBy(F.col('col1')).rowsBetween(Window.unboundedPreceding, 0)))
df.show()
+----+----+----+----------+-----------+
|col1|col2|col3|new_column|new_column2|
+----+----+----+----------+-----------+
|   3|   1|   2|         1|          1|
|   3|   2|   1|         1|          2|
|   3|   3|   2|         1|          3|
|   2|   1|   2|         1|          1|
|   2|   2|   1|         1|          2|
|   2|   3|   2|         1|          3|
|   1|   1|   2|         1|          1|
|   1|   2|   2|         0|          1|
|   1|   2|   1|         1|          2|
|   1|   3|   2|         1|          3|
+----+----+----+----------+-----------+
test = df.select('col1','col2','col3')
test.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   1|   2|
|   1|   2|   2|
|   1|   3|   2|
|   1|   2|   1|
|   2|   1|   2|
|   2|   3|   2|
|   2|   2|   1|
|   3|   1|   2|
|   3|   3|   2|
|   3|   2|   1|
+----+----+----+

we can see that the row order of 'col1','col2','col3' of df and test are different, but test shows the original order of df, this could be because of action and transformation, but I am not sure.

question from:https://stackoverflow.com/questions/65930528/pyspark-dataframe-doesnt-change-after-some-processing

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Since on a python dataframe packages, sometimes it will automatically alter the order after some functions are executed. If you are referring to debugging, I suggest you break down your code as the following:

df = df.withColumn("new_column", func_udf(F.lag("col3").over(w), df['col3'])).withColumn('new_column2', F.sum('new_column').over(w.partitionBy(F.col('col1')).rowsBetween(Window.unboundedPreceding, 0)))

to something like this

df = df.withColumn("new_column", func_udf(F.lag("col3").over(w), df['col3']))
df_partition = w.partitionBy(F.col('col1')).rowsBetween(Window.unboundedPreceding, 0))
df = df.withColumn('new_column2', F.sum('new_column').over(df_partition)

You should further break those lines into smaller function until you understand what each function does.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...