Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
401 views
in Technique[技术] by (71.8m points)

Writing data as JSON array with Spark Structured Streaming

I have to write data from Spark Structure streaming as JSON Array, I have tried using below code:

df.selectExpr("to_json(struct(*)) AS value").toJSON

which returns me DataSet[String], but unable to write as JSON Array.

Current Output:

{"name":"test","id":"id"}
{"name":"test1","id":"id1"}

Expected Output:

[{"name":"test","id":"id"},{"name":"test1","id":"id1"}]

Edit (moving comments into question):

After using proposed collect_list method I am getting

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;

Then I tried something like this -

withColumn("timestamp", unix_timestamp(col("event_epoch"), "MM/dd/yyyy hh:mm:ss aa")) .withWatermark("event_epoch", "1 minutes") .groupBy(col("event_epoch")) .agg(max(col("event_epoch")).alias("timestamp")) 

But I don't want to add a new column.

question from:https://stackoverflow.com/questions/65840595/writing-data-as-json-array-with-spark-structured-streaming

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can use the SQL built-in function collect_list for this. This function collects and returns a set of non-unique elements (compared to collect_set which returns only unique elements).

From the source code for collect_list you will see that this is an aggregation function. Based on the requirements given in the Structured Streaming Programming Guide on Output Modes it is highlighted that the output modes "complete" and "updated" are supported for aggregations without a watermark.

enter image description here

As I understand from your comments, you do not wish to add watermark and new columns. Also, the error you are facing

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; 

reminds you to not use the output mode "append".

In the comments, you have mentioned that you plan to produce the results into a Kafka message. One big JSON Array as one Kafka value. The complete code could look like

val df = spark.readStream
  .[...] // in my test I am reading from Kafka source
  .load()
  .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "offset", "partition")
  // do not forget to convert you data into a String before writing to Kafka
  .selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value")

df.writeStream
  .format("kafka")
  .outputMode("complete")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test")
  .option("checkpointLocation", "/path/to/sparkCheckpoint")
  .trigger(Trigger.ProcessingTime(10000))
  .start()
  .awaitTermination()

Given the key/value pairs (k1,v1), (k2,v2), and (k3,v3) as inputs you will get a value in the Kafka topic that contains all selected data as a JSON Array:

[{"key":"k1","value":"v1","offset":7,"partition":0}, {"key":"k2","value":"v2","offset":8,"partition":0}, {"key":"k3","value":"v3","offset":9,"partition":0}]

Tested with Spark 3.0.1 and Kafka 2.5.0.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...