I have created two dags which I want to trigger manually schedule_interval=None
.
First, I trigger "External_test_A" which should be pending till the "External_test_B" will not be triggered. After a while (after 2 minutes) I trigger "External_test_B" DAG, which runs only one task first_task
.
When the task: first_task
is success, then Poking for External_test_B.first_task
from "External_test_A", should finished, return success and run second_task
task from "External_test_A".
I stuck in the situation where even if first_task
is a success, the Poking for ...
from "External_test_A" is still going.
External_test_A:
default_args = {
'owner': 'airflow',
'start_date': pendulum.yesterday().astimezone('US/Eastern')
}
dag = DAG(
dag_id='External_test_A',
default_args=default_args,
schedule_interval=None
)
def do_second_task():
print('Second task is done')
sensor = ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='External_test_B',
external_task_id='first_task',
execution_delta=timedelta(minutes=3),
dag=dag)
t2 = PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
sensor >> t2
if __name__ == "__main__":
dag.cli()
External_test_B:
default_args = {
'owner': 'airflow',
'start_date': pendulum.yesterday().astimezone('US/Eastern')
}
dag = DAG(
dag_id='External_test_B',
default_args=default_args,
schedule_interval=None
)
t1 = DummyOperator(task_id='first_task', dag=dag)
t1
if __name__ == "__main__":
dag.cli()
Does some of you could tell me what I'm doing wrong? How to solve the problem with communication between two tasks from two different DAGs using only manual trigger?
question from:
https://stackoverflow.com/questions/65833817/airflow-externaltasksensor-manually-triggered 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…