I have a spark 2.0 application that reads messages from kafka using spark streaming (with spark-streaming-kafka-0-10_2.11).
Structured streaming looks really cool so I wanted to try and migrate the code but I can't figure out how to use it.
in the regular streaming I used kafkaUtils to createDstrean and in the parameters I passed it was the value deserializer.
in the Structured streaming the doc says that I should deserialize using DataFrame functions but I can't figure exactly what that means.
I looked at examples such as this example but my Avro object in Kafka is quit complex and cannot be simply casted like the String in the example..
So far I tried this kind of code (which I saw here in a different question):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
and I get "data type mismatch: cannot cast BinaryType to StructType(StructField(...."
how can I deserialize the value?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…