I'm trying to use ExternalTaskSensor and it gets stuck at poking another DAG's task, which has already been successfully completed.
Here, a first DAG "a" completes its task and after that a second DAG "b" through ExternalTaskSensor is supposed to be triggered. Instead it gets stuck at poking for a.first_task.
First DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_first_task():
print('First task is done')
PythonOperator(
task_id='first_task',
python_callable=do_first_task,
dag=dag)
Second DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
dag = DAG(
dag_id='b',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_second_task():
print('Second task is done')
ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='a',
external_task_id='first_task',
dag=dag) >>
PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
What am I missing here?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…