Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
312 views
in Technique[技术] by (71.8m points)

apache kafka - Consume Confluent Avro Format in Spark standalone cluster?

I have the following simple kafka consumer that works with confluent kafka/schema and avro based topics. It works as expected with local[*]. However, once I try to submit this to a standalone cluster (spark 3.0.1) using the following command:

/spark/bin/spark-submit 
        --class ${SPARK_APPLICATION_MAIN_CLASS} 
        --master ${SPARK_MASTER_URL} 
        ${SPARK_SUBMIT_ARGS} 
        ${SPARK_APPLICATION_JAR_LOCATION} ${SPARK_APPLICATION_ARGS}

where

      - SPARK_MASTER_NAME=spark-master
      - SPARK_MASTER_PORT=7077
      - SPARK_APPLICATION_MAIN_CLASS=xxxx.AvroConsumer
      - SPARK_APPLICATION_ARGS=-m spark://spark-master:7077 -b http://ipaddr:9092 -s http://ipaddr:8081 -t topic
      - SPARK_SUBMIT_ARGS=--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

I get the following error:

 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 172.22.0.3, executor 0): org.apache.spark.SparkException: Failed to execute user defined function(AvroConsumer$$$Lambda$571/434329437: (binary) => string)

The build.sbt:

import _root_.sbt.Keys._
import _root_.sbt.Resolver
import _root_.sbt._

lazy val root = (project in file("."))
  .settings(
    name := "spark-examples",
    organization := "xxxx",
    version := "1.0",
    scalaVersion := "2.12.11",
    mainClass in Compile := Some("xxxx")
  )

libraryDependencies ++= {
  val spark = "org.apache.spark"
  val sparkV = "3.0.1"

  val confluent = "io.confluent"
  val confluentV = "6.0.0"

  Seq(
    spark %% "spark-core"                 % sparkV,
    spark %% "spark-sql"                  % sparkV,
    spark %% "spark-hive"                 % sparkV,
    spark %% "spark-streaming"            % sparkV,
    spark %% "spark-streaming-kafka-0-10" % sparkV,
    spark %% "spark-sql-kafka-0-10"       % sparkV,
    spark %% "spark-avro"                 % sparkV,
    spark %% "spark-mllib"                % sparkV,
    confluent % "kafka-avro-serializer"   % confluentV,
    confluent % "kafka-schema-registry-client" % confluentV,
    "org.apache.logging.log4j" % "log4j-api" % "2.14.0",
    "com.typesafe.play" %% "play-json" % "2.9.2",
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.12.1",
    "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.12.1",
    "com.github.scopt" %% "scopt" % "4.0.0", //OptinsParser
    "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly()
  //    "org.scalatest" %% "scalatest" % "2.2.1" % "test",
//    "xxxx" %% "spark-testing-base" % "0.0.1" % "test"
  )
}

resolvers ++= Seq(
  "Confluent Repository" at "https://packages.confluent.io/maven/",
  "Mulesoft Repository" at "https://repository.mulesoft.org/nexus/content/repositories/public/"
)

assemblyMergeStrategy in assembly := {
  case "reference.conf" => MergeStrategy.concat
  case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
  case PathList("META-INF", xs@_*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

And the Consumer code:

// https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry
// https://github.com/xebia-france/spark-structured-streaming-blog/blob/master/src/main/scala/AvroConsumer.scala#L28-L39
package xxxx

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

/**
 * Consumer example with Spark dataframes and Confluent Kafka.
 */
object AvroConsumer {
  private var schemaRegistryClient: SchemaRegistryClient = _
  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false): String = {
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  }

  def avroSchemaToSparkSchema(avroSchema: String): SchemaConverters.SchemaType = {
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  }

  def main(args: Array[String]): Unit = {
    val cmd: CommandLine = parseArg(args)
    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(AvroConsumer.getClass.getName)
      .master(master)
      .getOrCreate()
    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

    spark.sparkContext.setLogLevel("ERROR")

    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      DeserializerWrapper.deserializer.deserialize(bytes)
    )

    val kafkaDataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
//      .option("startingOffsets", "earliest")
      .option("startingOffsets", "latest")
      .load()
    val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")

    import org.apache.spark.sql.functions._

    val dfValueSchema = {
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    }
    val formattedDataFrame = valueDataFrame.select(
      from_json(col("message"), dfValueSchema.dataType).alias("parsed_value"))
      .select("parsed_value.*")

    formattedDataFrame
      .writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

  object DeserializerWrapper {
    val deserializer: AvroDeserializer = kafkaAvroDeserializer
  }

  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
      genericRecord.toString
    }
  }

  private def parseArg(args: Array[String]): CommandLine = {
    import org.apache.commons.cli._

    val options = new Options
    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)
    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)
    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)
    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)
    val parser = new BasicParser
    parser.parse(options, args)
  }

}

What exactly is causing this error? Considering that the same code runs locally without an error.

question from:https://stackoverflow.com/questions/65906240/consume-confluent-avro-format-in-spark-standalone-cluster

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...