Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
352 views
in Technique[技术] by (71.8m points)

dynamic - Tasks instances dynamically created are being marked as RemovedWhen I am dynamically generating tasks using for loop

Please find the code below which we seem to construct the DAG but the tasks come in a state of removed.

We in the code below are trying to get a list of blobs and then based on that create dynamically dataflow template caller task operator which would run by taking in the input file parameter dynamically . So each call of the dynamic task which is the dataflow template caller would cater to one input file.

The DAG code :

    list_of_blobs=[]
    def read_text_file(config_file_path):
        if os.path.exists(config_file_path):
            try:
                with open(config_file_path, 'r', encoding='utf-8') as f:
                    configuration = f.read()#json.load(f) 
                    con = configuration.split(',')
                    global list_of_blobs
                    for c in con:
                        list_of_blobs.append(c)
                    
            except IOError as e:
                print(e)
    def read_config(**kwargs):
        today = date.today()
        bucket = "mystoragebucket"
        blob_name = "config/configuration_r2c.json"
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(str(bucket))
        blob = bucket.blob(str(blob_name))
        downloaded_blob = blob.download_as_string()
        data = json.loads(downloaded_blob.decode("utf-8"))
        kwargs['ti'].xcom_push(key='input_file', value=data['input_file'])
        kwargs['ti'].xcom_push(key='delimiter', value=data['delimiter'])
        kwargs['ti'].xcom_push(key='cols', value=data['cols'])
        kwargs['ti'].xcom_push(key='p_output_file', value=data['p_output_file'])
        kwargs['ti'].xcom_push(key='b_output_file', value=data['b_output_file'])
        kwargs['ti'].xcom_push(key='Cleansed_Bucket_FolderPath', value=data['Cleansed_Bucket_FolderPath'])
        kwargs['ti'].xcom_push(key='GCSSourceFolder', value=data['GCSSourceFolder'])
        kwargs['ti'].xcom_push(key='File_name', value=data['File_name'])
        kwargs['ti'].xcom_push(key='Cleansed_Bucket_Name', value=data['Cleansed_Bucket_Name'])
        kwargs['ti'].xcom_push(key='Source_Raw_Bucket_Name', value=data['Source_Raw_Bucket_Name'])
        kwargs['ti'].xcom_push(key='BigQueryTargetTableName', value=data['BigQueryTargetTableName'])
        source_bucket = storage_client.get_bucket(data['Source_Raw_Bucket_Name'])
        folder = 'processing' + '/' + data['GCSSourceFolder'] + '/' + data['File_name'] + '/' + str(today.year) + '/' + str(today.month) + '/' + str(today.day)
        blobs = source_bucket.list_blobs(prefix=folder)
        
        print(blobs)
        #blob_list= ['aaa','hhhhhh']
        blob_list = ''
        i=0
        for blob in blobs:
            if(blob.name.endswith('.csv') or blob.name.endswith('.dat') or blob.name.endswith('.gz')):
                if(i==0):
                    blob_list=blob_list + blob.name
                else: 
                    print(blob.name)
                    blob_list=blob_list+ ','+ blob.name
        try:
            with open("/home/airflow/gcs/data/blob_list.txt", "wt") as fout:
                fout.write(blob_list)
        except Exception as e:
            print(e)
        
        return data

    with models.DAG(
        # The id you will see in the DAG airflow page
        "raw_to_cleansed_example_4feb",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
        ) as dag:
        start = dummy_operator.DummyOperator(
            task_id='start',
            trigger_rule='all_success'
        )
        
        end = dummy_operator.DummyOperator(
            task_id='end',
            trigger_rule='all_success'
        )
        def dataflow_call(filename,i):
            today = date.today()
            
            return DataflowTemplateOperator(
                    # The task id of your job
                    task_id='dataflow_dynamic_{}'.format(str(i)),
                    template="gs://mystoragebucket/template/dataflowTemplate",
                    parameters={
                        "input_file":'gs://' + "{{ task_instance.xcom_pull(task_ids='get_file_name',key='Source_Raw_Bucket_Name') }}" + '/' + filename,
                        "delimiter":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='delimiter') }}",
                        "no_of_cols":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='cols') }}",
                        "p_output_file":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='p_output_file') }}",
                        "b_output_file":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='b_output_file') }}",
                   },
                )
        py_op = PythonOperator(
            task_id='get_file_name',
            provide_context=True,
            python_callable=read_config)
        
        
        start >> py_op 
        read_text_file('/home/airflow/gcs/data/blob_list.txt')
        
        i = 0
        for b in list_of_blobs:
            py_op >> dataflow_call(b,i) >> end
            i = i+1
question from:https://stackoverflow.com/questions/66058240/tasks-instances-dynamically-created-are-being-marked-as-removedwhen-i-am-dynamic

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

this stand solved as there was a job id parameter was missing


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...