【问题标题】:Organize DAGs tasks dependencies in airflow在气流中组织 DAG 任务依赖项
【发布时间】:2021-03-28 04:03:01
【问题描述】:

我有一个虚拟任务和一个使用循环并行运行的任务列表,对于一个任务,我希望再有一个串行任务。

到目前为止我尝试过的是:

dummy = DummyOperator(task_id='Dummy'.upper(),
                                              dag=dag)


final = DummyOperator(task_id='FinalTask'.upper(),
                                              dag=dag)


for task in ['Task1', 'Task2', 'Task3']:

    if task == 'Task1'
            task1 = DummyOperator(task_id='Task1-a'.upper(),
                                              dag=dag)

        else:
             ...
    else:
        ...

    tasks = DummyOperator(task_id=task),
                                      dag=dag)

    dummy >> tasks
    tasks >> task1
    tasks >> final

【问题讨论】:

    标签: python-3.x airflow-scheduler airflow


    【解决方案1】:

    你没有解释我们如何知道Task1的子任务的逻辑。

    这应该构建您想要的结构:

    tasks = ['Task1', 'Task2', 'Task3']
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2020, 12, 17),
    }
    
    with DAG(
        dag_id='dummyplay2',
        default_args=default_args,
        schedule_interval=None,
    ) as dag:
        start_op = DummyOperator(task_id='start')
        final_op = DummyOperator(task_id='final')
        for task in tasks:
            task_op = DummyOperator(task_id=task)
            start_op >> task_op
            if task == 'Task1':
                #This loop creates the sub task logic.
                #You can replace ord('b') with ord('z) and it will create more sub tasks
                for i in range(ord('a'), ord('b')+1):
                    sub_task_op = DummyOperator(task_id=f'{task}_{chr(i)}')
                    task_op >> sub_task_op >> final_op
            else:
                task_op >> final_op
    

    DAG 将是:

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-20
      • 2017-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多