There is nothing wrong with your code if it works in batch mode.
It is important to not only convert the source into a stream (by using readStream
and load
) but it is also required to convert the sink part into a stream.
The error message you are getting is just reminding you to also look into the sink part. Your Dataframe final_df
is actually a streaming Dataframe which has to be started through start
.
The Structured Streaming Guide gives you a good overview on all available Output Sinks and the easiest would be to print the result to the console.
To summarize, you need to add the following to your program:
final_df.writeStream
.format("console")
.start()
spark.streams.awaitAnyTermination()
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…