【问题标题】:How to build dynamic Workflow in Airflow DAG with multiple for loop?如何在具有多个 for 循环的 Airflow DAG 中构建动态工作流?
【发布时间】:2020-03-26 14:44:35
【问题描述】:

我正在创建一个具有多个 'for' 循环的动态 DAG。它正确地启动了流动,但在下游连接得不是很好。任务“dummy_ender_0_a”按预期连接到“toto_a”。但我期待“dummy_ender_1_a”和“dummy_ender_2_a”也连接到下游任务“toto_a”。我不确定我在这里缺少什么。代码如下:

import datetime
from airflow import models
from airflow.utils import dates
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator

ENV = 'DEV'
META_DELTA_DAYS=3
tables=['a','b','c','d']

with models.DAG(
        'test_'+ENV,
        schedule_interval = None ,
        concurrency = 1 ,
        max_active_runs = 1,
        catchup = False, # for now manualy catchup
        default_args={
            "start_date": datetime.datetime.strptime('2019-09-01 23:10:00', '%Y-%m-%d %H:%M:%S'),
            "retries": 1,
            "retry_delay": datetime.timedelta(minutes=1),
            "execution_timeout": datetime.timedelta(minutes=14), # safty check
            # notifications
            "email_on_failure": True,
            "email_on_retry": False
            'email': [models.Variable.get('alert_email')],
            "project_id": models.Variable.get('project'),
            'provide_context': True
        }
) as dag:

    def parsing_ts(**kwargs):
        etl_run_time_nodash = kwargs['ts_nodash'].replace('+','').replace('T','')[:-2]
        etl_run_time  = datetime.datetime.fromtimestamp(kwargs['execution_time'].timestamp())
        print('>>> START DATE >>>>',etl_run_time_nodash)
        for i,dt in enumerate([etl_run_time - datetime.timedelta(days=x) for x in range(META_DELTA_DAYS)]):
            kwargs['ti'].xcom_push(key='curr_yr_minus_'+str(i), value=dt.year)
            kwargs['ti'].xcom_push(key='curr_mth_minus_'+str(i), value=dt.month)
            kwargs['ti'].xcom_push(key='curr_day_minus_'+str(i), value=dt.dayofmonth)





    def printval(**kwargs):
        etl_run_time_nodash = kwargs['ti'].xcom_pull(key='etl_run_time_nodash')
        print('>>> START DATE NO DASH>>>>',etl_run_time_nodash)



    def start(table,**kwargs):
        return DummyOperator(task_id = "dummy_start_%s"%(table) )

    def starter(i,table,**kwargs):
        return DummyOperator(task_id = "dummy_starter_%s_%s"%(i,table) )

    def ender(i,table,**kwargs):
        return DummyOperator(task_id = "dummy_ender_%s_%s"%(i,table) )

    def toto(table,**kwargs):
        return DummyOperator(task_id = "toto_%s"%(table) )

    def end(**kwargs):
        return DummyOperator(task_id = "dummy_end")

    parsing_ts_op = python_operator.PythonOperator(
        task_id='parsing_ts_operator',
        provide_context=True,
        python_callable=parsing_ts)

    printval_op = python_operator.PythonOperator(
        task_id='printval_operator',
        provide_context=True,
        python_callable=printval)

    for table in tables:
        parsing_ts_op >> start(table)
        for i in range(META_DELTA_DAYS):
            start(table) >> starter(i,table)
            starter(i,table) >> ender(i,table)
            ender(i,table) >> toto(table)
        toto(table) >> printval_op
    printval_op >> end()

The DAG flow

【问题讨论】:

    标签: google-cloud-platform airflow google-cloud-composer


    【解决方案1】:

    感谢 Kris Geusebroek,我能够解决这个问题。他让我在内循环之前制作 toto(table) 并将其分配给您在内循环中使用的变量。这是我如何更改循环部分:

    for table in tables:
        d = toto(table)
        parsing_ts_op >> start(table)
        for i in range(META_DELTA_DAYS):
            start(table) >> starter(i,table)
            starter(i,table) >> ender(i,table)
            ender(i,table) >> d
        d >> printval_op
    printval_op >> end()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-03-05
      • 1970-01-01
      • 1970-01-01
      • 2020-01-23
      • 2017-05-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多