You can do this without a udf
using a Window
.
Consider the following example:
import pyspark.sql.functions as f
data = [
('a', 5),
('a', 8),
('a', 7),
('b', 1),
('b', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()
#+---+---+
#| A| B|
#+---+---+
#| a| 5|
#| a| 8|
#| a| 7|
#| b| 1|
#| b| 3|
#+---+---+
Create a Window
to partition by column A
and use this to compute the maximum of each group. Then filter out the rows such that the value in column B
is equal to the max.
from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))
.where(f.col('B') == f.col('maxB'))
.drop('maxB')
.show()
#+---+---+
#| A| B|
#+---+---+
#| a| 8|
#| b| 3|
#+---+---+
Or equivalently using pyspark-sql
:
df.registerTempTable('table')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()
#+---+---+
#| A| B|
#+---+---+
#| b| 3|
#| a| 8|
#+---+---+
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…