How can I run PySpark Streaming Application in EMR?
I am using spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 dwh.py
this command to run on emr but it stucks. I've used the same code working in my local machine.
I am reading from kafka topic and write stream into console, also tried to write stream to kafka topic but no luck.
Here is the snippet:
if __name__ == "__main__":
conf = SparkConf().setAppName("UserOrders RDD kafka")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
ssc.checkpoint("Checkpoint")
spark = SparkSession
.builder
.appName('kafka')
.config("spark.eventLog.enabled", "false")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "4g")
.master("local[2]")
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("startingOffsets", "latest")
.option("subscribe", "users")
.load()
castdf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
castdf.printSchema()
getmsg = aggr_users
.writeStream
.trigger(processingTime='5 seconds')
.outputMode("update")
.format("console")
.start().awaitTermination()
# result = aggr_users.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
# .writeStream
# .format("kafka")
# .outputMode("append")
# .option("kafka.bootstrap.servers", "kafka:9092")
# .option("topic", "user_aggregate_demo")
# .option("checkpointLocation", "/tmp/vaquarkhan/checkpoint")
# .start()
# .awaitTermination()
question from:
https://stackoverflow.com/questions/65644968/how-to-run-pyspark-structured-streaming-application-on-aws-emr 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…