Per @Juan Riza's suggestion I checked out this link: Proper way to create dynamic workflows in Airflow. This was pretty much the answer, although I was able to simplify the solution enough that I thought I would offer my own modified version of the implementation here:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
ENV = os.environ
default_args = {
# 'start_date': datetime.now(),
'start_date': datetime(2017, 7, 18)
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)
def id_worker(uid):
return PythonOperator(
task_id=uid,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)
for uid in capone_dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
clear_tables
cleans the database that will be re-built as a result of the process. id_worker
is a function that dynamically generates new preprocessing tasks, based on the array of ID values returned from get_if_creds
. The task ID is just the corresponding user ID, though it could easily have been an index, i
, as in the example mentioned above.
NOTE That bitshift operator (<<
) looks backwards to me, as the clear_tables
task should come first, but it's what seems to be working in this case.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…