Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
748 views
in Technique[技术] by (71.8m points)

python 3.x - Adding custom jars to pyspark in jupyter notebook

I am using the Jupyter notebook with Pyspark with the following docker image: Jupyter all-spark-notebook

Now I would like to write a pyspark streaming application which consumes messages from Kafka. In the Spark-Kafka Integration guide they describe how to deploy such an application using spark-submit (it requires linking an external jar - explanation is in 3. Deploying). But since I am using Jupyter notebook I never actually run the spark-submit command, I assume it gets run in the back if I press execute.

In the spark-submit command you can specify some parameters, one of them is -jars, but it is not clear to me how I can set this parameter from the notebook (or externally via environment variables?). I am assuming I can link this external jar dynamically via the SparkConf or the SparkContext object. Has anyone experience on how to perform the linking properly from the notebook?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I've managed to get it working from within the jupyter notebook which is running form the all-spark container.

I start a python3 notebook in jupyterhub and overwrite the PYSPARK_SUBMIT_ARGS flag as shown below. The Kafka consumer library was downloaded from the maven repository and put in my home directory /home/jovyan:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = 
  '--jars /home/jovyan/spark-streaming-kafka-assembly_2.10-1.6.1.jar pyspark-shell'

import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

sc = pyspark.SparkContext()
ssc = StreamingContext(sc,1)

broker = "<my_broker_ip>"
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test1"],
                        {"metadata.broker.list": broker})
directKafkaStream.pprint()
ssc.start()

Note: Don't forget the pyspark-shell in the environment variables!

Extension: If you want to include code from spark-packages you can use the --packages flag instead. An example on how to do this in the all-spark-notebook can be found here


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...