It it not possible to modify the DAG during its execution (without a lot more work).
The dag = DAG(...
is picked up in a loop by the scheduler. It will have task instance 'python_operator'
in it. That task instance gets scheduled in a dag run, and executed by a worker or executor. Since DAG models in the Airflow DB are only updated by the scheduler these added dummy tasks will not be persisted to the DAG nor scheduled to run. They will be forgotten when the worker exits. Unless you copy all the code from the scheduler regarding persisting & updating the model… but that will be undone the next time the scheduler visits the DAG file for parsing, which could be happening once a minute, once a second or faster depending how many other DAG files there are to parse.
Airflow actually wants each DAG to approximately stay the same layout between runs. It also wants to reload/parse DAG files constantly. So though you could make a DAG file that on each run determines the tasks dynamically based on some external data (preferably cached in a file or pyc module, not network I/O like a DB lookup, you'll slow down the whole scheduling loop for all the DAGs) it's not a good plan as your graph and tree view will get all confusing, and your scheduler parsing will be more taxed by that lookup.
You could make the callable run each task…
def make_tasks(context):
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1.execute(context)
du2.execute(context)
du3.execute(context)
p = PythonOperator(
provides_context=true,
But that's sequential, and you have to work out how to use python to make them parallel (use futures?) and if any raise an exception the whole task fails. Also it is bound to one executor or worker so not using airflow's task distribution (kubernetes, mesos, celery).
The other way to work with this is to add a fixed number of tasks (the maximal number), and use the callable(s) to short circuit the unneeded tasks or push arguments with xcom for each of them, changing their behavior at run time but not changing the DAG.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…