【问题标题】:Is there a way to pause an airflow DagRun?有没有办法暂停气流 DagRun?
【发布时间】:2019-04-29 18:08:20
【问题描述】:

有没有办法在 Airflow 中暂停特定的 DagRun?

我希望能够对单个 DAG 进行多个同时执行的运行,并且我希望能够在某些点单独暂停这些运行。

取消暂停/暂停功能似乎只在 DAG 级别起作用,并暂停/取消所有 DagRuns(针对该 DAG)的执行。

我希望能够做到这一点,因为我想要一些长时间运行的异步任务,并且我不想占用一个运行无限传感器的工作人员,所以我想创建一个任务暂停 dag 和一些其他操作(例如 API 调用)将取消暂停 dag 运行。

【问题讨论】:

  • thisthis 相关;不幸的是,那些还没有解决方案

标签: airflow


【解决方案1】:

Paused 不是 DagRun 状态 (https://github.com/apache/airflow/blob/c9023fad4287213e4d3d77f4c66799c762bff7ba/airflow/utils/state.py#L65),所以没有办法专门暂停 DagRun。你可以暂停整个 DAG,我用它来处理长时间运行的 DAG,并取得了一些成功。另一种选择是利用优先级,并确保 DAG 的多个实例可以同时运行。下面是一些演示该概念的玩具代码:

@dag(schedule_interval='@once', start_date=utils.dates.days_ago(1), max_active_runs=1)
def MyDAG(is_slow=False):
    @task_group()
    def build_tasks():
        if is_slow:
            priority_weight=1
        else:
            priority_weight=100
        @task(priority_weight=priority_weight)
        def my_task():
            # your logic
            return
        return my_task()

     ti = build_tasks()
     return ti
dag = MyDAG()

is_slow 可以在 DagRun 作业配置中作为参数传递。如果您知道特定的运行需要很长时间,这种方法将确保后续的快速实例将获得优先权并在第一次慢速运行之前运行。重要的是,这种方法只有在您有很多可用的工作人员并且慢速运行触发的任务实例数量远少于工作人员数量的情况下才有效。

【讨论】:

    【解决方案2】:

    如果这是关于传感器的,那么您很幸运,因为该解决方案是在 1.10.2 版中实现的:https://issues.apache.org/jira/browse/AIRFLOW-2747

    :param mode: How the sensor operates.
        Options are: ``{ poke | reschedule }``, default is ``poke``.
        When set to ``poke`` the sensor is taking up a worker slot for its
        whole execution time and sleeps between pokes. Use this mode if the
        expected runtime of the sensor is short or if a short poke interval
        is requried.
        When set to ``reschedule`` the sensor task frees the worker slot when
        the criteria is not yet met and it's rescheduled at a later time. Use
        this mode if the expected time until the criteria is met is. The poke
        interval should be more than one minute to prevent too much load on
        the scheduler.
    

    来源:https://github.com/apache/airflow/blob/e62ad5333cbb56ae0f2001f0f79008a21c41a983/airflow/sensors/base_sensor_operator.py#L46

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-02-09
      • 2020-12-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多