Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

scala - DataFrame explode list of JSON objects

I have JSON data in the following format:

{
     "date": 100
     "userId": 1
     "data": [
         {
             "timeStamp": 101,
             "reading": 1
         },
         {
             "timeStamp": 102,
             "reading": 2
         }
     ]
 }
 {
     "date": 200
     "userId": 1
     "data": [
         {
             "timeStamp": 201,
             "reading": 3
         },
         {
             "timeStamp": 202,
             "reading": 4
         }
     ]
 }

I read it into Spark SQL:

val df = SQLContext.read.json(...)
df.printSchema
// root
//  |-- date: double (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- data: array (nullable = true)
//  |     |-- element: struct (containsNull = true)
//  |     |    |-- timeStamp: double (nullable = true)
//  |     |    |-- reading: double (nullable = true)

I would like to transform it in order to have one row per reading. To my understanding, every transformation should produce a new DataFrame, so the following should work:

import org.apache.spark.sql.functions.explode
val exploded = df
    .withColumn("reading", explode(df("data.reading")))
    .withColumn("timeStamp", explode(df("data.timeStamp")))
    .drop("data")
exploded.printSchema
// root
//  |-- date: double (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- timeStamp: double (nullable = true)
//  |-- reading: double (nullable = true)

The resulting schema is correct, but I get every value twice:

exploded.show
// +-----------+-----------+-----------+-----------+
// |       date|     userId|  timeStamp|    reading|
// +-----------+-----------+-----------+-----------+
// |        100|          1|        101|          1|
// |        100|          1|        101|          1|
// |        100|          1|        102|          2|
// |        100|          1|        102|          2|
// |        200|          1|        201|          3|
// |        200|          1|        201|          3|
// |        200|          1|        202|          4|
// |        200|          1|        202|          4|
// +-----------+-----------+-----------+-----------+

My feeling is that there is something about the lazy evaluation of the two explodes that I don't understand.

Is there a way to get the above code to work? Or should I use a different approach all together?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The resulting schema is correct, but I get every value twice

While schema is correct the output you've provided doesn't reflect actual result. In practice you'll get Cartesian product of timeStamp and reading for each input row.

My feeling is that there is something about the lazy evaluation

No, it has nothing to do with lazy evaluation. The way you use explode is just wrong. To understand what is going on lets trace execution for date equal 100:

val df100 = df.where($"date" === 100)

step by step. First explode will generate two rows, one for 1 and one for 2:

val df100WithReading = df100.withColumn("reading", explode(df("data.reading")))

df100WithReading.show
// +------------------+----+------+-------+
// |              data|date|userId|reading|
// +------------------+----+------+-------+
// |[[1,101], [2,102]]| 100|     1|      1|
// |[[1,101], [2,102]]| 100|     1|      2|
// +------------------+----+------+-------+

The second explode generate two rows (timeStamp equal 101 and 102) for each row from the previous step:

val df100WithReadingAndTs = df100WithReading
  .withColumn("timeStamp", explode(df("data.timeStamp")))

df100WithReadingAndTs.show
// +------------------+----+------+-------+---------+
// |              data|date|userId|reading|timeStamp|
// +------------------+----+------+-------+---------+
// |[[1,101], [2,102]]| 100|     1|      1|      101|
// |[[1,101], [2,102]]| 100|     1|      1|      102|
// |[[1,101], [2,102]]| 100|     1|      2|      101|
// |[[1,101], [2,102]]| 100|     1|      2|      102|
// +------------------+----+------+-------+---------+

If you want correct results explode data and select afterwards:

val exploded = df.withColumn("data", explode($"data"))
  .select($"userId", $"date",
    $"data".getItem("reading"),  $"data".getItem("timestamp"))

exploded.show
// +------+----+-------------+---------------+
// |userId|date|data[reading]|data[timestamp]|
// +------+----+-------------+---------------+
// |     1| 100|            1|            101|
// |     1| 100|            2|            102|
// |     1| 200|            3|            201|
// |     1| 200|            4|            202|
// +------+----+-------------+---------------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...