【问题标题】:Custom Operator States (queued, success, etc.) in Apache Airflow?Apache Airflow 中的自定义操作员状态(排队、成功等)?
【发布时间】:2021-10-07 21:52:53
【问题描述】:

在 Apache Airflow (2.x) 中,每个 Operator 实例都有一个定义为 here (airflow source repo) 的状态。

我有两个用例似乎并没有明显地落入预定义的状态:

  1. 警告,但不要失败 - 这似乎应该是一个非常标准的用例,我很惊讶没有在开箱即用的气流源代码中看到它。基本上,我想用醒目的颜色对节点进行颜色编码 - 比如橙色 - 对应于非致命警告,但否则继续正常执行。显然,您可以将警告打印到日志中,但要找到它们需要更多的工作,而不仅仅是查看 DAG 页面上的彩色圆圈。

  2. “Sensor N/A”或“Data not ready” - 当传感器注意到源系统中的数据尚未准备好时分配的状态,并且可以跳过下游操作符直到下一次执行DAG,但数据管道中没有任何内容被破坏。基本上是预期的分支结束。

有没有一种通过开箱即用的 Airflow 节点状态实现这些用例的好方法?如果没有,有没有办法定义自定义运算符状态?由于我在托管服务 (MWAA) 上运行气流,我不认为更改我们部署的源代码是一种选择。

谢谢,

【问题讨论】:

    标签: airflow


    【解决方案1】:

    任务状态与 Airflow 紧密集成。无法配置哪个日志记录级别导致哪个状态。我想说最简单的方法是 grep 日志文件以获取“警告”或设置日志聚合服务,例如Elasticsearch 使日志文件可搜索。

    对于#2,传感器不知道为什么传感器超时。在达到timeoutexecution_timeout 之后,它们只会引发异常。您可以使用trigger_rules 处理异常,但这些仍然没有考虑异常的原因。

    如果您想对此进行更多控制,我会实现您自己的传感器,它需要一个参数,例如data_not_ready_timeout(小于timeoutexecution_timeout)。在poke() 方法中,检查是否已达到data_not_ready_timeout,如果是则引发AirflowSkipException。这将跳过下游任务。一旦到达timeoutexecution_timeout,任务就会失败。查看BaseSensorOperator.execute() 以获得一些灵感,以获取传感器的初始启动日期。

    【讨论】:

    • > 在 poke() 方法中,检查是否已达到 data_not_ready_timeout,如果是则引发 AirflowSkipException。这将跳过下游任务。我确实相信这适用于我的用例。谢谢,
    猜你喜欢
    • 2017-06-01
    • 2023-02-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-02
    • 1970-01-01
    相关资源
    最近更新 更多