【问题标题】:Airflow trigger tasks only based on previous runs status气流触发任务仅基于以前的运行状态
【发布时间】:2018-08-16 20:41:42
【问题描述】:

有没有办法根据之前的任务运行状态触发下一个任务。场景如下:

  • Task1 - m DAG 中的第一个任务
  • Task2 - 仅在 task1 成功时运行 task2
  • Task3 - 仅在 task3 成功后运行任务 3
  • Task4 - 仅当 task1 运行超过 10 小时(错过 SLA)时才运行任务 4

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    这里有多种选择:

    1. 使用触发规则,请参阅trigger-rules 了解如何使用它们。
    2. 使用 on_failure_callback 和 on_success_callback 定义任务失败/成功时会发生什么,请参阅this post 或 BaseOperator API Reference 中的定义(请参阅参数 -> on_failure_callback,on_success_callback)。
    3. 如果您只希望在失败或 SLA 未命中的情况下发送电子邮件,并且在这种情况下不应执行其他任务,请定义:
      default_args = {'email': ['some_email_adress'],'email_on_failure': True"},然后气流将发送带有错误/SLA 的电子邮件错过定义的电子邮件。

    【讨论】:

    • 还有,BranchPythonOperator
    • 谢谢克里斯。我没有提到我正在尝试发送松弛消息而不是电子邮件。我应该能够动态地将此消息发送为 "task succeeded"、"task failed" & "task Running(sla not met)" 。如何捕获 task1 的这 3 种状态?
    • 默认失败和成功的最简单设置是 task1 下游的 2 个 slack 运算符,一个具有 trigger_rule='all_success' 和一个具有 'all_failed'。现在我从来没有使用过 SLA,所以如果任务失败,如果它错过了 SLA,或者如果气流只发送一封电子邮件,我不会这样做。如果失败,您必须弄清楚如果失败是由 SLA 未命中引起的,如何获取信息。如果在错过 SLA 时任务失败,并且您知道如何从 task_instance 获取该信息,您可能可以在故障松弛操作员消息中使用 jinja 模板。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-29
    • 1970-01-01
    • 2019-06-05
    • 1970-01-01
    • 2019-11-14
    相关资源
    最近更新 更多