【问题标题】:Sharing information between DAGs in airflow在气流中的 DAG 之间共享信息
【发布时间】:2021-10-22 21:32:33
【问题描述】:

我有一个 dag,它告诉另一个 dag 按特定顺序创建哪些任务。

Dag 1 -> 具有任务顺序的文件

  • 每 5 分钟左右运行一次,以保持此文件的最新状态。

Dag 2 -> 运行任务

  • 每天运行。

如何使用 Airflow 在两个 DAG 之间传递这些数据。

解决方案和问题

  • 使用气流变量的问题是我无法在运行时设置它们。
  • 使用 Xcoms 的问题是它们只能在任务阶段运行,并且一旦在 Dag 2 中创建了任务,它们就被设置并且不能正确更改?
  • 将文件推送到 s3 的问题在于,由于我无法控制的团队决定的安全原因,气流实例无权从 s3 中提取。

那我该怎么办?我有哪些选择?

【问题讨论】:

  • “使用气流变量的问题是我无法在运行时设置它们”是什么意思?乍一看,我认为 DAG1 执行 Variable.set(...) 命令可以满足您的用例,但可能还有更多内容。
  • 你不能只发现顺序作为 Dag-2 的第一步吗?
  • 啊,我没有意识到Variable.set 是一个东西。它不在文档中......我可以在任务之外调用它吗?我可以在任务的设置代码中调用它吗?

标签: airflow


【解决方案1】:

第一个 DAG 的输出文件格式是什么?我会推荐以下工作流程

Dag 1 -> 更新任务顺序并将其存储在气流环境中的 yaml 或 json 文件中。

Dag 2 -> 读取文件以创建所需的任务并每天运行它们。

您需要了解气流会不断读取您的 dag 文件以获得最新配置,因此不需要额外的步骤。

【讨论】:

  • 如何执行第 1 步?我可以在哪里存放它?以及如何?
  • 这必须使用 PythonOperator 或 CustomOperator 来完成。使用 PyYaml 库,您可以从 Python 操作 Yaml 文件。您可以将文件存储在 DAG 脚本所在的同一文件夹中,或者存储在您可以在运行 Airflow 的同一环境中设置的任何其他文件夹中。唯一的要求是将此文件保存在本地,以便尽可能快地读取。
  • 我假设您已经有一个定义任务顺序的运算符。如果是这样,请尝试扩展此操作员的工作以完成此任务。
【解决方案2】:

我过去也遇到过类似的问题,这在很大程度上取决于您的设置。

如果您在 Kubernetes 上运行 Airflow,这可能会起作用。

  1. 您创建一个 PV(持久卷)和 PVC
  2. 您使用 KubernetesOperator 启动应用程序并将 PVC 挂载到它。
  3. 您将结果存储在 PVC 上。
  4. 您将 PVC 安装到另一个 pod。

【讨论】:

    猜你喜欢
    • 2023-03-28
    • 1970-01-01
    • 1970-01-01
    • 2018-11-22
    • 1970-01-01
    • 1970-01-01
    • 2016-07-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多