【问题标题】:Airflow: is there a way to group operators together outside of a dag?Airflow:有没有办法在 dag 之外将操作员分组?
【发布时间】:2021-09-06 08:54:33
【问题描述】:

有没有办法设计一个在 dag 之外实现特定数据管道模式的 python 类,以便将此类用于需要此模式的所有数据管道?

示例:为了将数据从 Google Cloud Storage 加载到 Big Query,该过程可以是通过数据质量测试来验证提取候选文件。然后尝试在 Big Query 的原始表中加载数据,然后根据加载结果将文件分派到存档或被拒绝的文件夹中。

做一次很容易,如果需要做1000次怎么办?我想弄清楚如何优化工程时间。

可以考虑使用 SubDag,但它在 performances 方面存在局限性,并且将被弃用 anyway

任务组需要成为要实施的 dag 的一部分 https://github.com/apache/airflow/blob/1be3ef635fab635f741b775c52e0da7fe0871567/airflow/utils/task_group.py#L35

实现预期行为的一种方法可能是从利用动态 DAGing 的单个 python 文件生成 dag、任务组和任务

尽管如此,在这个特定文件中使用的代码不能在代码库的某个地方重用。尽管 DRYness 与可理解性始终是一个权衡,但它与 DRYness 相悖。

【问题讨论】:

    标签: python airflow google-cloud-composer


    【解决方案1】:

    基于此article

    这是解决这个问题的方法:

    您可以在气流./plugins 中定义一个插件 让我们在 ./plugins/test_taskgroup.py 中创建一个示例任务组

    from airflow import DAG
    from airflow.operators.dummy import DummyOperator
    from airflow.operators.python import PythonOperator
    from airflow.utils.task_group import TaskGroup
    
    def hello_world_py():
            print('Hello World')
    
    def build_taskgroup(dag: DAG) -> TaskGroup:
        
        with TaskGroup(group_id="taskgroup") as taskgroup:
            dummy_task = DummyOperator(
                task_id="dummy_task",
                dag=dag
            )
            python_task = PythonOperator(
                task_id="python_task",
                python_callable=hello_world_py,
                dag=dag
            )
    
        dummy_task >> python_task
        return taskgroup
    
    

    您可以像这样在一个简单的 python DAG 中调用它:

    from airflow.utils import task_group
    from test_plugin import build_taskgroup
    from airflow import DAG
    
    
    with DAG(
        dag_id="modularized_dag",
        schedule_interval="@once",
        start_date=datetime(2021, 1, 1),
    ) as dag:
    
        task_group = build_taskgroup(dag)
    
    

    这是结果

    【讨论】:

      【解决方案2】:

      我也对这个问题感兴趣。 Airflow 2.0 发布了 Dynamic DAG 的新功能。虽然我不确定它是否会完全回答您的设计。它可以解决单一代码库的问题。就我而言,我有一个功能可以创建一个具有必要参数的任务组。然后我用函数迭代创建每个 DAG,以创建具有不同参数的任务组。这是我的伪代码的概述:

      def create_task_group(group_id, a, b, c):
          with TaskGroup(group_id=group_id) as my_task_group:
              # add some tasks
              pass
      
      for x in LIST_OF_THINGS:
          dag_id = f"{x}_workflow"
          schedule_interval = SCHEDULE_INTERVAL[x]
      
          with DAG(
              dag_id,
              start_date=START_DATE,
              schedule_interval=schedule_interval,
          ) as globals()[dag_id]:
              task_group = create_task_group(x, ..., ..., ...)
      

      这里的LIST_OF_THINGS 代表不同配置的列表。每个 DAG 可以有不同的dag_idschedule_intervalstart_date 等。您可以在一些配置文件中定义您的任务配置,例如 JSON 或 YAML,并将其解析为字典。

      我没有尝试过,但从技术上讲,如果您需要重用相同的功能,您也许可以将create_task_group() 移动到某个类中并导入它。任务组的另一个好处是可以将任务依赖添加到其他任务或任务组中,非常方便。

      我看到了一个使用额外包为 Airflow DAG 配置 YAML 的概念,但我不确定它是否成熟。

      在此处查看有关动态 DAG 的更多信息:https://www.astronomer.io/guides/dynamically-generating-dags

      【讨论】:

      • 这篇文章回答了我们的问题towardsdatascience.com/…
      • 我明白了,感谢分享@flomalb。听起来我们要走一个好方法,只需将 task_group 函数移动到可导入的地方。 :)
      【解决方案3】:

      您应该创建自己的 Operator,然后在您的 DAG 中使用它。 扩展 BaseOperator 并使用挂钩到 BigQuery 或任何您需要的东西。

      【讨论】:

      • 请在您的回答中提供更多详细信息。正如目前所写的那样,很难理解您的解决方案。
      • 这似乎是一种替代解决方案,但是,通过挂钩将多个运算符组合在一起的自定义运算符在 UI​​ 中缺乏可见性,需要构建以支持幂等性。此外,您需要自行维护此运算符并进行适当的测试
      猜你喜欢
      • 2023-02-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-01
      • 1970-01-01
      • 2017-08-21
      • 2022-10-07
      相关资源
      最近更新 更多