【问题标题】:Problem defining DAG with Airflow 2 Taskflow API使用 Airflow 2 Taskflow API 定义 DAG 时出现问题
【发布时间】:2021-02-26 07:32:33
【问题描述】:

我使用气流 2 任务流 API 创建了一个 DAG:

with airflow.DAG("plot", schedule_interval=None, default_args=default_args) as dag:
    cf = collect_files()
    upi = update_process_info(cf)
    for i in range(0, max_parallel_plot_tasks):
        plot_files(cf, i, int(max_parallel_plot_tasks)) >> upi

如何通过 Taskflow API 摆脱“collect_files”到“update_process_info”的连接?

图表:

问候 奥利

【问题讨论】:

    标签: airflow airflow-2.x


    【解决方案1】:

    尝试类似:

    with airflow.DAG("plot", schedule_interval=None, default_args=default_args) as dag:
        cf = collect_files()
        upi = None
        for i in range(0, max_parallel_plot_tasks):
            if not upi:
                upi = update_process_info(plot_files(cf, i, int(max_parallel_plot_tasks)))
            else:
                plot_files(cf, i, int(max_parallel_plot_tasks)) >> upi
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-06-16
      • 1970-01-01
      • 2021-12-08
      • 2020-05-11
      相关资源
      最近更新 更多