I want to customize my DAG to call a datarbicks notebook when it is success or failure. I have created two different functions to call a databricks notebook based on the success/failure cases. success or failure callback function is calling but databricsks notebook is not executing. here is the sample code.
def task_success_callback(context):
""" task_success callback """
context['task_instance'].task_id
print("success case")
dq_notebook_success_task_params = {
'existing_cluster_id': Variable.get("DATABRICKS_CLUSTER_ID"),
'notebook_task': {
'notebook_path': '/AAA/Airflow/Operators/audit_file_operator',
'base_parameters': {
"root": "dbfs:/mnt/aaa",
"audit_file_path": "/success_file_path/",
"table_name": "sample_data_table",
"audit_flag": "success"
}
}
}
DatabricksSubmitRunOperator(
task_id="weather_table_task_id",
databricks_conn_id='databricks_conn',
json=dq_notebook_success_task_params,
do_xcom_push=True,
secrets=[secret.Secret(
deploy_type='env',
deploy_target=None,
secret='adf-service-principal'
), secret.Secret(
deploy_type='env',
deploy_target=None,
secret='postgres-credentials',
)],
)
def task_failure_callback(context):
""" task_success callback """
context['task_instance'].task_id
print("failure case")
dq_notebook_failure_task_params = {
'existing_cluster_id': Variable.get("DATABRICKS_CLUSTER_ID"),
'notebook_task': {
'notebook_path': '/AAA/Airflow/Operators/audit_file_operator',
'base_parameters': {
"root": "dbfs:/mnt/aaa",
"audit_file_path": "/failure_file_path/",
"table_name": "sample_data_table",
"audit_flag": "failure"
}
}
}
DatabricksSubmitRunOperator(
task_id="weather_table_task_id",
databricks_conn_id='databricks_conn',
json=dq_notebook_failure_task_params,
do_xcom_push=True,
secrets=[secret.Secret(
deploy_type='env',
deploy_target=None,
secret='adf-service-principal'
), secret.Secret(
deploy_type='env',
deploy_target=None,
secret='postgres-credentials',
)],
)
DEFAULT_ARGS = {
"owner": "admin",
"depends_on_past": False,
"start_date": datetime(2020, 9, 23),
"on_success_callback": task_success_callback,
"on_failure_callback": task_failure_callback,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(seconds=10),
}
==================
Remaining DAG code
==================
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…