You can chain conditions to find which columns is equal to the maximum value:
cond = "psf.when" + ".when".join(["(psf.col('" + c + "') == psf.col('max_value'), psf.lit('" + c + "'))" for c in df.columns])
import pyspark.sql.functions as psf
df.withColumn("max_value", psf.greatest(*df.columns))
.withColumn("MAX", eval(cond))
.show()
+-----+--------+----+-----+---------+--------+
|Alice|Eleonora|Mike|Helen|max_value| MAX|
+-----+--------+----+-----+---------+--------+
| 2| 7| 8| 6| 8| Mike|
| 11| 5| 9| 4| 11| Alice|
| 6| 15| 12| 3| 15|Eleonora|
| 5| 3| 7| 8| 8| Helen|
+-----+--------+----+-----+---------+--------+
OR: explode and filter
from itertools import chain
df.withColumn("max_value", psf.greatest(*df.columns))
.select("*", psf.posexplode(psf.create_map(list(chain(*[(psf.lit(c), psf.col(c)) for c in df.columns])))))
.filter("max_value = value")
.select(df.columns + [psf.col("key").alias("MAX")])
.show()
OR: using a UDF
on a dictionary:
from pyspark.sql.types import *
argmax_udf = psf.udf(lambda m: max(m, key=m.get), StringType())
df.withColumn("map", psf.create_map(list(chain(*[(psf.lit(c), psf.col(c)) for c in df.columns]))))
.withColumn("MAX", argmax_udf("map"))
.drop("map")
.show()
OR: using a UDF
with a parameter:
from pyspark.sql.types import *
def argmax(cols, *args):
return [c for c, v in zip(cols, args) if v == max(args)][0]
argmax_udf = lambda cols: psf.udf(lambda *args: argmax(cols, *args), StringType())
df.withColumn("MAX", argmax_udf(df.columns)(*df.columns))
.show()
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…