Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
417 views
in Technique[技术] by (71.8m points)

python - Airflow ExternalTaskSensor gets stuck

I'm trying to use ExternalTaskSensor and it gets stuck at poking another DAG's task, which has already been successfully completed.

Here, a first DAG "a" completes its task and after that a second DAG "b" through ExternalTaskSensor is supposed to be triggered. Instead it gets stuck at poking for a.first_task.

First DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='a',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_first_task():
    print('First task is done')

PythonOperator(
    task_id='first_task',
    python_callable=do_first_task,
    dag=dag)

Second DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

dag = DAG(
    dag_id='b',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_second_task():
    print('Second task is done')

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='a',
    external_task_id='first_task',
    dag=dag) >> 
PythonOperator(
    task_id='second_task',
    python_callable=do_second_task,
    dag=dag)

What am I missing here?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

ExternalTaskSensor assumes that you are dependent on a task in a dag run with the same execution date.

This means that in your case dags a and b need to run on the same schedule (e.g. every day at 9:00am or w/e).

Otherwise you need to use the execution_delta or execution_date_fn when you instantiate an ExternalTaskSensor.

Here is the documentation inside the operator itself to help clarify further:

:param execution_delta: time difference with the previous execution to
    look at, the default is the same execution_date as the current task.
    For yesterday, use [positive!] datetime.timedelta(days=1). Either
    execution_delta or execution_date_fn can be passed to
    ExternalTaskSensor, but not both.

:type execution_delta: datetime.timedelta


:param execution_date_fn: function that receives the current execution date
    and returns the desired execution date to query. Either execution_delta
    or execution_date_fn can be passed to ExternalTaskSensor, but not both.

:type execution_date_fn: callable

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...