Your idea works perfectly fine.
Actually, it is mandatory to collect your Dataframe to the driver. Otherwise, you can not create a distributed dataset by calling the SparkSession on each executor. Without the collect
you will end up having a NullPointerException.
I have slightly re-written your code sceleton and also implemented the part on how to write your Dataframe into a delta table (based on your other question). In addition, I am using a Dataset[String]
instead of a Dataframe[Row]
which makes life a bit easier.
Using Spark 3.0.1 with delta-core 0.7.0 works fine. As an example my test file looks like
I sent the location of that file to a Kafka topic called "test" and applied the following code to parse the file and write its columns (based on a given schema) into a delta table using the code below:
val spark = SparkSession.builder()
val jsonSchema = new StructType()
.add("a", StringType)
.add("b", StringType)
val deltaPath = "file:///tmp/spark/delta/test"
import spark.implicits._
val kafkaDf = spark.readStream
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.selectExpr("CAST(value AS STRING) as data_path")
kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String], batchId:Long) => {
// collect to driver
val records = batchDf.collect()
// create dataframe based on file location and process and write to Delta-Lake
records.foreach((path: String) => {
val dfToProcess = // replace this line with your custom processing logic
The output of the show
call is as expected:
|a |b |
and the data has been written as delta table into the location specified through deltaPath
/tmp/spark/delta/test$ ll
total 20
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
-rw-r--r-- 1 x x 595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
-rw-r--r-- 1 x x 16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc