You could use params
, which is a dictionary that can be defined at DAG level parameters and remains accesible in every task. Works for every operator derived from BaseOperator
and can also be set from the UI.
The following example shows how to use it with different operators. params
could be defined in default_args
dict or as arg to the DAG object.
default_args = {
"owner": "airflow",
'params': {
"param1": "first_param",
"param2": "second_param"
}
}
dag = DAG(
dag_id="example_dag_params",
default_args=default_args,
start_date=days_ago(1),
schedule_interval="@once",
tags=['example_dags'],
catchup=False
)
When triggering this DAG from the UI you could add an extra param:
Params could be accessed in templated fields, as in BashOperator
case:
with dag:
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo bash_task: {{ params.param1 }}')
bash_task
logs output:
{bash.py:158} INFO - Running command: echo bash_task: first_param
{bash.py:169} INFO - Output:
{bash.py:173} INFO - bash_task: first_param
{bash.py:177} INFO - Command exited with return code 0
Params are accessible within execution context, like in python_callable
:
def _print_params(**kwargs):
print(f"Task_id: {kwargs['ti'].task_id}")
for k, v in kwargs['params'].items():
print(f"{k}:{v}")
python_task = PythonOperator(
task_id='python_task',
python_callable=_print_params,
)
Output:
{logging_mixin.py:104} INFO - Task_id: python_task
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI
You could also add params at task level definition:
python_task_2 = PythonOperator(
task_id='python_task_2',
python_callable=_print_params,
params={'param4': 'param defined at task level'}
)
Output:
{logging_mixin.py:104} INFO - Task_id: python_task_2
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param4:param defined at task level
{logging_mixin.py:104} INFO - param3:param_from_the_UI
Following the example you could define a custom Operator that inhertis from BaseOperator
:
class CustomDummyOperator(BaseOperator):
@apply_defaults
def __init__(self, custom_arg: str = 'default', *args, **kwargs) -> None:
self.custom_arg = custom_arg
super(CustomDummyOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print(f"Task_id: {self.task_id}")
for k, v in context['params'].items():
print(f"{k}:{v}")
An the task example would be:
custom_op_task = CustomDummyOperator(
task_id='custom_operator_task'
)
Output:
{logging_mixin.py:104} INFO - Task_id: custom_operator_task
{logging_mixin.py:104} INFO - custom_arg: default
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI
Imports:
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults
I hope that works for you!