【问题标题】:Dynamically building collection to loop over in Airflow dag动态构建集合以在 Airflow dag 中循环
【发布时间】:2020-01-09 16:14:38
【问题描述】:

我最近一直在使用 Airflow,发现一个非常常见的模式是循环一些集合以创建多个任务。与 github 的 example dags 文件夹中的 example_python_operator.py dag 非常相似。

我的问题与动态构建循环迭代的集合有关。假设您要为存储在数据库中的一组未知客户端中的每一个创建一个任务,并且您计划查询它们作为填充列表的一种方式。像这样:

first_task = PythonOperator(
    task_id='some_upstream_task',
    provide_context=True,
    python_callable=some_upstream_task,
    dag=dag)

clients = my_database_query()

for client in clients:
    task = PythonOperator(
        task_id='client_' + str(client),
        python_callable=some_function,
        dag=dag)

    task.set_upstream(first_task)

据我所知,这意味着即使您的 dag 仅每周运行一次,您的数据库也会每 30 秒轮询一次这些客户端。即使您从迭代器设置上游运算符并通过 xcoms 返回客户端并将 my_database_query() 替换为 xcom_pull() 您仍然每 30 秒轮询一次 xcoms。这对我来说似乎很浪费,所以我想知道这种类型的 dag 是否有更好的模式?

【问题讨论】:

  • 我认为动态 DAG 在 Airflow 中是一种不好的做法。在每次 DAG 渲染中,所有以前的 DAG 运行都会更改。您可能会丢失一些历史记录/日志 + 您可能会意外触发不需要的执行,因为旧 DAG 运行中的任务将以“无状态”添加。如果您触摸此类 DAG 运行,它们将被触发。
  • stackoverflow.com/questions/55672724/… 这应该会给你一些想法

标签: python airflow


【解决方案1】:

在您的代码示例中,我们没有看到 DAG 的计划间隔,但我假设您已经计划好了它,比如说 @daily,并且您希望数据库查询每天运行一次。

在 Airflow 中,调度程序会定期解析 DAG(因此称为“每 30 秒”)。所以你的python代码会导致问题。

在您的情况下,我会考虑改变观点:为什么不尝试在 PosgresOperator link 中运行数据库查询,然后将其作为 DAG 的一部分?根据该 Operator 的输出(例如,您可以通过 XCOM 或通过 Object Storage 中的文件传播),您可以在下游拥有一个 PythonOperator,它不会为一个客户端运行功能,而是为所有客户端运行。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-23
    • 1970-01-01
    相关资源
    最近更新 更多