Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.5k views
in Technique[技术] by (71.8m points)

performance - scala spark dataframe explode is slow - so, alternate method - create columns and rows from arrays in a column

Scala 2.11.8, spark 2.0.1

The explode function is very slow - so, looking for an alternate method. I think it is possible with RDD's with flatmap - and, help is greatly appreciated.

I have an udf that returns List(String, String, String, Int) of varying lengths. For each row in the dataframe, I want to create multiple rows, and make multiple columns.

def Udf = udf ( (s: String ) => {
   if (s=="1") Seq(("a", "b", "c", 0), ("a1", "b1", "c1", 1), ("a2", "b2", "c2", 2)).toList   
       else Seq(("a", "b", "c", 0)).toList
})

val df = Seq(("a", "1"), ("b", "2")).toDF("A", "B")
val df1 = df.withColumn("C", Udf($"B"))
val df2 = df1.select($"A", explode($"C"))
val df3 = df2.withColumn("D", $"col._1").withColumn("E", $"col._2").withColumn("F", $"col._3").withColumn("G", $"col._4")

/// dataframe after going through udf
+---+---+--------------------+
|  A|  B|                   C|
+---+---+--------------------+
|  a|  1|[[a,b,c,0], [a1,b...|
|  b|  2|         [[a,b,c,0]]|
+---+---+--------------------+

///Final dataframe
+---+------------+---+---+---+---+
|  A|         col|  D|  E|  F|  G|
+---+------------+---+---+---+---+
|  a|   [a,b,c,0]|  a|  b|  c|  0|
|  a|[a1,b1,c1,1]| a1| b1| c1|  1|
|  a|[a2,b2,c2,2]| a2| b2| c2|  2|
|  b|   [a,b,c,0]|  a|  b|  c|  0|
+---+------------+---+---+---+---+

This is very slow on many millions of rows. Takes over 12 hours.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can:

  • Update Spark to version 2.3 or later where SPARK-21657 should be fixed.
  • Replace your code with flatMap:

    df.as[(String, String)].flatMap { 
      case (a, "1") => Seq(
        (a, "a", "b", "c", 0), (a, "a1", "b1", "c1", 1), 
        (a, "a2", "b2", "c2", 2)
      )
      case (a, _) => Seq((a, "a", "b", "c", 0))
    }
    

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...