The said behaviour can be achieved by introducing a task that forces a delay of specified duration between your Task 1
and Task 2
This can be achieved using PythonOperator
import time
from airflow.operators.python_operator import PythonOperator
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
dag=my_dag,
python_callable=lambda: time.sleep(300))
task_1 >> delay_python_task >> task_2
Or using BashOperator
as well
from airflow.operators.bash_operator import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
dag=my_dag,
bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2
Note: The given code-snippets are NOT tested
References
UPDATE-1
Here are some other ways of introducing delay
on_success_callback
/ on_failure_callback
: Depending of whether Task 2
is supposed to run upon success or failure of Task 1
, you can pass lambda: time.sleep(300)
in either of these params of Task 1
pre_execute()
/ post_execute()
: Invoking time.sleep(300)
in Task 1
's post_execute()
or Task 2
's pre_execute()
would also have the same effect. Of course this would involve modifying code for your tasks
(1 or 2) so better avoid it
Personally I would prefer the extra task
approach because it makes things more explicit and doesn't falsely exaggerate the runtime of your Task 1
or Task 2
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…