Unfortunately there is no case when:
- Spark
DataFrame
is justified considering amount of data.
- Transposition of data is feasible.
You have to remember that DataFrame
, as implemented in Spark, is a distributed collection of rows and each row is stored and processed on a single node.
You could express transposition on a DataFrame
as pivot
:
val kv = explode(array(df.columns.tail.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))
df
.withColumn("kv", kv)
.select($"segment_id", $"kv.k", $"kv.v")
.groupBy($"k")
.pivot("segment_id")
.agg(first($"v"))
.orderBy($"k")
.withColumnRenamed("k", "vals")
but it is merely a toy code with no practical applications. In practice it is not better than collecting data:
val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
case Array(h, t @ _*) => {
(h.map(_.toString), t.map(_.collect { case x: Int => x }))
}
}
val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val schema = StructType(
StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
)
spark.createDataFrame(sc.parallelize(rows), schema)
For DataFrame
defined as:
val df = Seq(
(1, 100, 0, 0, 0, 0, 0),
(2, 0, 50, 0, 0, 20, 0),
(3, 0, 0, 0, 0, 0, 0),
(4, 0, 0, 0, 0, 0, 0)
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
both would you give you the desired result:
+----+---+---+---+---+
|vals| 1| 2| 3| 4|
+----+---+---+---+---+
|val1|100| 0| 0| 0|
|val2| 0| 50| 0| 0|
|val3| 0| 0| 0| 0|
|val4| 0| 0| 0| 0|
|val5| 0| 20| 0| 0|
|val6| 0| 0| 0| 0|
+----+---+---+---+---+
That being said if you need an efficient transpositions on distributed data structure you'll have to look somewhere else. There is a number of structures, including core CoordinateMatrix
and BlockMatrix
, which can distribute data across both dimensions and can be transposed.