Before you proceed: This operations is yet another another groupByKey
. While it has multiple legitimate applications it is relatively expensive so be sure to use it only when required.
Not exactly concise or efficient solution but you can use UserDefinedAggregateFunction
introduced in Spark 1.5.0:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
Example usage:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
You can also create a Python wrapper as shown in Spark: How to map Python with Scala or Java User Defined Functions?
In practice it can be faster to extract RDD, groupByKey
, mkString
and rebuild DataFrame.
You can get a similar effect by combining collect_list
function (Spark >= 1.6.0) with concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…