Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
118 views
in Technique[技术] by (71.8m points)

java - How to run Spark code in Airflow?

Hello people of the Earth! I'm using Airflow to schedule and run Spark tasks. All I found by this time is python DAGs that Airflow can manage.
DAG example:

spark_count_lines.py
import logging

from airflow import DAG
from airflow.operators import PythonOperator

from datetime import datetime

args = {
  'owner': 'airflow'
  , 'start_date': datetime(2016, 4, 17)
  , 'provide_context': True
}

dag = DAG(
  'spark_count_lines'
  , start_date = datetime(2016, 4, 17)
  , schedule_interval = '@hourly'
  , default_args = args
)

def run_spark(**kwargs):
  import pyspark
  sc = pyspark.SparkContext()
  df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
  logging.info('Number of lines in people.txt = {0}'.format(df.count()))
  sc.stop()

t_main = PythonOperator(
  task_id = 'call_spark'
  , dag = dag
  , python_callable = run_spark
)

The problem is I'm not good in Python code and have some tasks written in Java. My question is how to run Spark Java jar in python DAG? Or maybe there is other way yo do it? I found spark submit: http://spark.apache.org/docs/latest/submitting-applications.html
But I don't know how to connect everything together. Maybe someone used it before and has working example. Thank you for your time!

question from:https://stackoverflow.com/questions/39827804/how-to-run-spark-code-in-airflow

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You should be able to use BashOperator. Keeping the rest of your code as is, import required class and system packages:

from airflow.operators.bash_operator import BashOperator

import os
import sys

set required paths:

os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

and add operator:

spark_task = BashOperator(
    task_id='spark_java',
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
    dag=dag
)

You can easily extend this to provide additional arguments using Jinja templates.

You can of course adjust this for non-Spark scenario by replacing bash_command with a template suitable in your case, for example:

bash_command = 'java -jar {{ params.jar }}'

and adjusting params.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...