【发布时间】:2020-01-09 16:14:38
【问题描述】:
我最近一直在使用 Airflow,发现一个非常常见的模式是循环一些集合以创建多个任务。与 github 的 example dags 文件夹中的 example_python_operator.py dag 非常相似。
我的问题与动态构建循环迭代的集合有关。假设您要为存储在数据库中的一组未知客户端中的每一个创建一个任务,并且您计划查询它们作为填充列表的一种方式。像这样:
first_task = PythonOperator(
task_id='some_upstream_task',
provide_context=True,
python_callable=some_upstream_task,
dag=dag)
clients = my_database_query()
for client in clients:
task = PythonOperator(
task_id='client_' + str(client),
python_callable=some_function,
dag=dag)
task.set_upstream(first_task)
据我所知,这意味着即使您的 dag 仅每周运行一次,您的数据库也会每 30 秒轮询一次这些客户端。即使您从迭代器设置上游运算符并通过 xcoms 返回客户端并将 my_database_query() 替换为 xcom_pull() 您仍然每 30 秒轮询一次 xcoms。这对我来说似乎很浪费,所以我想知道这种类型的 dag 是否有更好的模式?
【问题讨论】:
-
我认为动态 DAG 在 Airflow 中是一种不好的做法。在每次 DAG 渲染中,所有以前的 DAG 运行都会更改。您可能会丢失一些历史记录/日志 + 您可能会意外触发不需要的执行,因为旧 DAG 运行中的任务将以“无状态”添加。如果您触摸此类 DAG 运行,它们将被触发。
-
stackoverflow.com/questions/55672724/… 这应该会给你一些想法