【问题标题】:How to skip a task in airflow without skipping its downstream tasks?如何跳过气流中的任务而不跳过其下游任务?
【发布时间】:2021-04-23 17:04:27
【问题描述】:

假设这是我的 dag: A >> B >> C

如果任务 B 引发异常,我想跳过任务而不是失败。但是,我不想跳过任务 C。我查看了 AirflowSkipException 和 soft_fail 传感器,但它们也都强制跳过了下游任务。有没有人有办法做到这一点?

谢谢!

【问题讨论】:

    标签: airflow-scheduler directed-acyclic-graphs airflow


    【解决方案1】:

    您可以参考trigger_rule 上的 Airflow 文档。

    trigger_rule 允许你配置任务的执行依赖。通常,当所有上游任务都成功时,才会执行一个任务。您可以将其更改为其他trigger rules provided in Airflowall_failed 触发规则仅在所有上游任务都失败时执行任务,这将完成您概述的内容。

    from datetime import datetime
    
    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.utils.trigger_rule import TriggerRule
    
    with DAG(
            dag_id="my_dag",
            start_date=datetime(2021, 4, 5),
            schedule_interval='@once',
    ) as dag:
        p = PythonOperator(
            task_id='fail_task',
            python_callable=lambda x: 1,
        ) 
        
        t = PythonOperator(
            task_id='run_task',
            python_callable=lambda: 1,
            trigger_rule=TriggerRule.ALL_FAILED
        )
        
        p >> t
    

    【讨论】:

      【解决方案2】:

      当前发布的答案涉及不同的主题或似乎不完全正确。

      将触发规则 all_failed 添加到 Task-C 将不适用于 OP 的示例 DAG:A >> B >> C,除非 Task-A 以 failed 状态结束,这很可能是不可取的。

      实际上,OP 非常接近,因为可以通过混合使用 AirflowSkipExceptionnone_failed 触发规则来实现预期的行为:

      from datetime import datetime
      
      from airflow.exceptions import AirflowSkipException
      from airflow.models import DAG
      from airflow.operators.dummy import DummyOperator
      from airflow.operators.python import PythonOperator
      
      with DAG(
          dag_id="mydag",
          start_date=datetime(2022, 1, 18),
          schedule_interval="@once"
      ) as dag:
      
          def task_b():
              raise AirflowSkipException
      
          A = DummyOperator(task_id="A")
          B = PythonOperator(task_id="B", python_callable=task_b)
          C = DummyOperator(task_id="C", trigger_rule="none_failed")
      
          A >> B >> C
      

      哪个Airflow执行如下:

      这条规则是什么意思?

      触发规则

      none_failed:所有上游任务都没有失败或upstream_failed - 即所有上游任务都已成功或被跳过

      所以基本上我们可以在我们的代码中捕获实际异常并引发提到的“强制”任务状态从 failed 更改为 skipped 的 Airflow 异常。 但是,如果没有 Task-C 的 trigger_rule 参数,我们最终会将 Task-B 下游标记为 skipped

      【讨论】:

        【解决方案3】:

        您可以在任务声明中更改trigger_rule

        task = BashOperator(
            task_id="task_C",
            bash_command="echo hello world",
            trigger_rule="all_done",
            dag=dag
        )
        

        【讨论】:

          猜你喜欢
          • 2019-08-05
          • 1970-01-01
          • 1970-01-01
          • 2021-09-07
          • 1970-01-01
          • 2022-08-03
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多