Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
627 views
in Technique[技术] by (71.8m points)

scala - Spark: Transpose DataFrame Without Aggregating

I have looked at a number of questions online, but they don't seem to do what I'm trying to achieve.

I'm using Apache Spark 2.0.2 with Scala.

I have a dataframe:

+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
|         1|  100|   0|   0|   0|   0|   0|
|         2|    0|  50|   0|   0|  20|   0|
|         3|    0|   0|   0|   0|   0|   0|
|         4|    0|   0|   0|   0|   0|   0|
+----------+-----+----+----+----+----+----+

which I want to transpose to

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
|val2|    0|  50|   0|   0|
|val3|    0|   0|   0|   0|
|val4|    0|   0|   0|   0|
|val5|    0|  20|   0|   0|
|val6|    0|   0|   0|   0|
+----+-----+----+----+----+

I've tried using pivot() but I couldn't get to the right answer. I ended up looping through my val{x} columns, and pivoting each as per below, but this is proving to be very slow.

val d = df.select('segment_id, 'val1)

+----------+-----+
|segment_id| val1|
+----------+-----+
|         1|  100|
|         2|    0|
|         3|    0|
|         4|    0|
+----------+-----+

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
+----+-----+----+----+----+

Then using union() on each iteration of val{x} to my first dataframe.

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val2|    0|  50|   0|   0|
+----+-----+----+----+----+

Is there a more efficient way of a transpose where I do not want to aggregate data?

Thanks :)

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Unfortunately there is no case when:

  • Spark DataFrame is justified considering amount of data.
  • Transposition of data is feasible.

You have to remember that DataFrame, as implemented in Spark, is a distributed collection of rows and each row is stored and processed on a single node.

You could express transposition on a DataFrame as pivot:

val kv = explode(array(df.columns.tail.map { 
  c => struct(lit(c).alias("k"), col(c).alias("v")) 
}: _*))

df
  .withColumn("kv", kv)
  .select($"segment_id", $"kv.k", $"kv.v")
  .groupBy($"k")
  .pivot("segment_id")
  .agg(first($"v"))
  .orderBy($"k")
  .withColumnRenamed("k", "vals")

but it is merely a toy code with no practical applications. In practice it is not better than collecting data:

val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
  case Array(h, t @ _*) => {
    (h.map(_.toString), t.map(_.collect { case x: Int => x }))
  }
}

val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val schema = StructType(
  StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
)

spark.createDataFrame(sc.parallelize(rows), schema)

For DataFrame defined as:

val df = Seq(
  (1, 100, 0, 0, 0, 0, 0),
  (2, 0, 50, 0, 0, 20, 0),
  (3, 0, 0, 0, 0, 0, 0),
  (4, 0, 0, 0, 0, 0, 0)
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")

both would you give you the desired result:

+----+---+---+---+---+
|vals|  1|  2|  3|  4|
+----+---+---+---+---+
|val1|100|  0|  0|  0|
|val2|  0| 50|  0|  0|
|val3|  0|  0|  0|  0|
|val4|  0|  0|  0|  0|
|val5|  0| 20|  0|  0|
|val6|  0|  0|  0|  0|
+----+---+---+---+---+

That being said if you need an efficient transpositions on distributed data structure you'll have to look somewhere else. There is a number of structures, including core CoordinateMatrix and BlockMatrix, which can distribute data across both dimensions and can be transposed.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...