【发布时间】:2021-09-06 08:54:33
【问题描述】:
有没有办法设计一个在 dag 之外实现特定数据管道模式的 python 类,以便将此类用于需要此模式的所有数据管道?
示例:为了将数据从 Google Cloud Storage 加载到 Big Query,该过程可以是通过数据质量测试来验证提取候选文件。然后尝试在 Big Query 的原始表中加载数据,然后根据加载结果将文件分派到存档或被拒绝的文件夹中。
做一次很容易,如果需要做1000次怎么办?我想弄清楚如何优化工程时间。
可以考虑使用 SubDag,但它在 performances 方面存在局限性,并且将被弃用 anyway。
任务组需要成为要实施的 dag 的一部分 https://github.com/apache/airflow/blob/1be3ef635fab635f741b775c52e0da7fe0871567/airflow/utils/task_group.py#L35。
实现预期行为的一种方法可能是从利用动态 DAGing 的单个 python 文件生成 dag、任务组和任务
尽管如此,在这个特定文件中使用的代码不能在代码库的某个地方重用。尽管 DRYness 与可理解性始终是一个权衡,但它与 DRYness 相悖。
【问题讨论】:
标签: python airflow google-cloud-composer