【问题标题】:How to implement polling in Airflow?如何在 Airflow 中实现轮询?
【发布时间】:2018-01-14 09:49:09
【问题描述】:

我想使用 Airflow 来实现定期轮询外部系统(ftp 服务器等)的数据流,检查符合特定条件的新文件,然后为这些文件运行一堆任务。现在,我是 Airflow 的新手,并且读到 Sensors 是您可以在这种情况下使用的东西,而且我实际上设法编写了一个在我为其运行“气流测试”时工作正常的传感器。但是对于传感器的 poke_interval 和 DAG 调度的关系,我有点困惑。我应该如何为我的用例定义这些设置?还是我应该使用其他方法?我只希望 Airflow 在这些文件可用时运行任务,而不是在一段时间内没有新文件可用时让仪表板出现故障。

【问题讨论】:

    标签: airflow apache-airflow


    【解决方案1】:

    您的理解是正确的,当您想进行轮询时,可以使用传感器,无论是使用现有传感器还是实现自己的传感器。

    但是,它们始终是 DAG 的一部分,并且不会在其边界之外执行。 DAG 执行取决于start_dateschedule_interval,但您可以利用它和传感器根据外部服务器的状态来实现某种 DAG:一种可能的方法是使用一个传感器启动整个 DAG,该传感器检查如果条件不满足,则决定跳过整个 DAG(您可以通过将传感器的 soft_fail 参数设置为 True 来确保传感器将下游任务标记为 skipped 而不是 failed)。通过使用最频繁的调度选项 (* * * * *),您可以将轮询间隔设置为一分钟。如果您真的需要最短的轮询时间,您可以调整传感器的 poke_intervaltimeout 参数。

    但请记住,Airflow 本身可能无法保证执行时间,因此对于非常短的轮询时间,您可能需要研究替代方案(或至少考虑与我刚刚分享的方法不同的方法)。

    【讨论】:

    • 使用 soft_fail=True 似乎可以解决问题,并使我的 DAG 仪表板免受不必要的错误,所以我接受这个答案。对于调度,我将只使用 DAG 调度,然后在几次快速戳后使传感器超时。我的投票时间表通常是每小时或每天,所以我想应该这样做。还是有什么陷阱?
    • 不,每小时的时间表绝对可以。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-04-16
    • 1970-01-01
    • 1970-01-01
    • 2022-01-14
    • 2017-09-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多