【问题标题】:How to skip tasks on Airflow?如何跳过 Airflow 上的任务?
【发布时间】:2018-09-05 17:54:05
【问题描述】:

我试图了解 Airflow 是否支持跳过 DAG 中的任务以进行临时执行?

假设我的 DAG 图如下所示: 任务1 > 任务2 > 任务3 > 任务4

我想从 task3 手动启动我的 DAG,最好的方法是什么?

我已阅读有关 ShortCircuitOperator 的信息,但我正在寻找更多可在触发执行后应用的临时解决方案。

谢谢!

【问题讨论】:

  • skip_task_1=True 这样的BranchOperator + 变量就足够了吗?
  • 运行后,您可以通过对该任务使用“清除”来“重新运行”该任务。这行得通吗?
  • 如果你可以尝试将你的 Dag 分成两部分并使用来自 task2 > task3 的 TriggerDagRunOperator 就像这样github.com/apache/incubator-airflow/blob/…
  • BranchOperator + 变量是什么意思?你能分享一个例子吗?
  • 使用airflow.apache.org/concepts.html#variables,然后是检查“if variable=true”的BranchOperator,然后跟随跳过task1和task2的分支,直接进入task3。

标签: airflow directed-acyclic-graphs airflow-scheduler


【解决方案1】:

您可以合并 ShortCircuitOperator uses under the hoodSkipMixin 以跳过下游任务。

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)
    
    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition
    
    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
       
        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")

【讨论】:

  • 谢谢!但我一直在寻找更特别的东西——从任何 DAG 上的任何任务开始的能力。无论上游依赖项是什么以及是否满足。
  • 不知道怎么做 - 您可以将 task3 设置为运行而不管 task1 或 task2 的结果,或者更早添加一个 branchOperator 以确定运行哪个,但默认情况下所有任务都将在其中运行按照图表指示的顺序执行。
【解决方案2】:

是的,您可以通过另一个临时基础来执行此操作。 不知怎的找到了!!

你需要引发 AirflowSkipException

from airflow.exceptions import AirflowSkipException
def execute():
    if condition:
        raise AirflowSkipException

task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)

【讨论】:

    【解决方案3】:

    是的,您只需单击任务 3。切换运行按钮右侧的复选框以忽略依赖项,然后单击运行。

    【讨论】:

      【解决方案4】:

      根据 Apache Airflow 的构建方式,您可以编写逻辑/分支来确定要运行哪些任务。

      但是

      您不能从中间的任何任务开始执行任务。排序完全由依赖管理(上游/下游)定义。

      但是,如果您使用 celery 运算符,您可以忽略运行中的所有依赖项,并要求气流按照您的意愿执行任务。再说一次,这不会阻止上游的任务被安排。

      【讨论】:

      • 谢谢!假设我只讲手动触发,没有任何调度
      • 那为什么要先安排呢?
      • Airflow 提供了良好的流量管理。不仅调度。而我们主要对那部分感兴趣——依赖图、并行性等等
      • 我是一个大组织的一员,Airflow 已经在那里,它提供了我们需要的大部分功能,除了我在我的问题中提出的问题。
      • 您在自相矛盾。当你有一个下游任务依赖于上游(依赖)时,只要上游有更新,就不能启动下游。到目前为止,您可以使用的最好的东西是@Ben Gregory 描述的自定义跳过运算符。正如我之前提到的,you cannot start task from any task in between 这就是依赖的含义。要启动一个任务,只要它不是第一个任务,就必须有所有上游任务的状态更新。
      【解决方案5】:

      玛雅人, 有一个非常肮脏但非常简单且最明显的解决方案。几乎 30 秒。但是,只有当您可以轻松地更新 PROD 中的代码并能够临时阻止其他人运行 DAG 时,才有可能。 只是评论你想跳过的任务

      '#task1 > 任务2 >

      任务3 > 任务4

      一个更严肃但更努力的解决方案可能是根据 start_from_task 的参数动态创建 DAG,在这种情况下,将使用此参数构建依赖关系。可以使用 Admin==>Variables 菜单在 UI 中更改参数。您可能还可以使用前一个变量的另一个导出时间变量。例如- DAG 将忽略 task1 和 task2 直到 14:05:30,之后将运行整个 DAG。

      【讨论】:

      • 我试过这个。这不是跳过任务,它只会直接导致task3和task2而无需等待。您必须将任务也放入评论中。
      猜你喜欢
      • 1970-01-01
      • 2022-09-28
      • 1970-01-01
      • 2021-06-02
      • 2021-10-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多