The options that the SparkSubmitOperator
in Airflow requires can be sent in a the dictionary. Keep in mind that the keys in the dictionary should be the same as teh parameter names to the function.
Create the following two dictionaries:
base_config = {
"task_id":"TEST",
"conn_id":"test-conn",
"application": "/home/test/test.py"
"executor-memory":"10G",
"driver-memory":"10G",
"executor-cores":2,
"principal":"test-host@test",
"keytab":"/home/test-host.keytab",
"env_vars":{"SPARK_MAJOR_VERSION":2}
}
spark_config = {
"spark.master": "yarn",
"spark.submit.deployMode": "client",
"spark.yarn.queue":"test",
"spark.dynamicAllocation.minExecutors":5,
"spark.dynamicAllocation.maxExecutors":10,
"spark.yarn.driver.memoryOverhead":5120,
"spark.driver.maxResultSize":"2G",
"spark.yarn.executor.memoryOverhead":5120,
"spark.kryoserializer.buffer.max":"1000m",
"spark.executor.extraJavaOptions":"-XX:+UseG1GC",
"spark.network.timeout":"15000s",
"spark.executor.heartbeatInterval":"1500s",
"spark.task.maxDirectResultSize":"8G",
"spark.ui.view.acls":"*"
}
SparkSubmitOperator(**base_config,conf=spark_config)
This should make your flow configuration driven.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…