I create a dateframe, and use window function to get the accumulative value, but after use the function,df and df.select() show with different row order
spark = SparkSession.builder.master('local[8]').appName("SparkByExample.com").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", '50')
# create dataframe
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,FloatType,LongType,ArrayType
data = [(1,1,2),
(1,2,2),
(1,3,2),
(1,2,1),
(2,1,2),
(2,3,2),
(2,2,1),
(3,1,2),
(3,3,2),
(3,2,1)]
schema = StructType([
StructField("col1", IntegerType(),True),
StructField("col2", IntegerType(),True),
StructField("col3", IntegerType(),True)])
df = spark.createDataFrame(data=data,schema=schema)
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 1| 2|
| 1| 2| 2|
| 1| 3| 2|
| 1| 2| 1|
| 2| 1| 2|
| 2| 3| 2|
| 2| 2| 1|
| 3| 1| 2|
| 3| 3| 2|
| 3| 2| 1|
+----+----+----+
# this is window fuction to get the accumulative value
from pyspark.sql.window import Window
w = Window().partitionBy(F.col('col1')).orderBy(F.col('col2'))
def f(lag_val, current_val):
value = 0
if lag_val != current_val:
value = 1
return value
# register udf so we can use with our dataframe
func_udf = F.udf(f, IntegerType())
print(id(df))
df = df.withColumn("new_column", func_udf(F.lag("col3").over(w), df['col3'])).withColumn('new_column2', F.sum('new_column').over(w.partitionBy(F.col('col1')).rowsBetween(Window.unboundedPreceding, 0)))
df.show()
+----+----+----+----------+-----------+
|col1|col2|col3|new_column|new_column2|
+----+----+----+----------+-----------+
| 3| 1| 2| 1| 1|
| 3| 2| 1| 1| 2|
| 3| 3| 2| 1| 3|
| 2| 1| 2| 1| 1|
| 2| 2| 1| 1| 2|
| 2| 3| 2| 1| 3|
| 1| 1| 2| 1| 1|
| 1| 2| 2| 0| 1|
| 1| 2| 1| 1| 2|
| 1| 3| 2| 1| 3|
+----+----+----+----------+-----------+
test = df.select('col1','col2','col3')
test.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| 1| 2|
| 1| 2| 2|
| 1| 3| 2|
| 1| 2| 1|
| 2| 1| 2|
| 2| 3| 2|
| 2| 2| 1|
| 3| 1| 2|
| 3| 3| 2|
| 3| 2| 1|
+----+----+----+
we can see that the row order of 'col1','col2','col3' of df and test are different, but test shows the original order of df, this could be because of action and transformation, but I am not sure.
question from:
https://stackoverflow.com/questions/65930528/pyspark-dataframe-doesnt-change-after-some-processing