【发布时间】:2021-05-04 15:10:15
【问题描述】:
我正在尝试在 Airflow 中创建一个动态生成的 DAG。
主要组件是我的代码动态生成一堆要同时运行的任务的地方(使用自定义 Operator 来创建每个任务)。所有这些任务都具有相似的格式(即相同的依赖项和相同的下游),所以我没有this question 中描述的问题。用普通的 Python 术语来说,我只需定义一个函数,然后用不同的参数多次实例化它。
但是,如果我打算将其作为常规 Python 函数执行,那么将这么多常规 Python 混合到我的 Airflow DAG 中似乎很奇怪。我对 Airflow 的理解是,最好将所有逻辑保留在 DAG 之外的自定义运算符中。
另一方面,我不确定是否有可能(如果有,如何)拥有一个自定义 Operator,它创建不同 Operator 的实例以直接在 DAG 中使用。
直接在 DAG 中包含 Python 逻辑是不好的做法吗?如果这是一个坏主意,我应该如何实现这种模式?
(使用 Airflow 1.10.12 和 Python 3.7)
【问题讨论】: