【发布时间】:2021-04-23 17:04:27
【问题描述】:
假设这是我的 dag: A >> B >> C
如果任务 B 引发异常,我想跳过任务而不是失败。但是,我不想跳过任务 C。我查看了 AirflowSkipException 和 soft_fail 传感器,但它们也都强制跳过了下游任务。有没有人有办法做到这一点?
谢谢!
【问题讨论】:
标签: airflow-scheduler directed-acyclic-graphs airflow
假设这是我的 dag: A >> B >> C
如果任务 B 引发异常,我想跳过任务而不是失败。但是,我不想跳过任务 C。我查看了 AirflowSkipException 和 soft_fail 传感器,但它们也都强制跳过了下游任务。有没有人有办法做到这一点?
谢谢!
【问题讨论】:
标签: airflow-scheduler directed-acyclic-graphs airflow
您可以参考trigger_rule 上的 Airflow 文档。
trigger_rule 允许你配置任务的执行依赖。通常,当所有上游任务都成功时,才会执行一个任务。您可以将其更改为其他trigger rules provided in Airflow。 all_failed 触发规则仅在所有上游任务都失败时执行任务,这将完成您概述的内容。
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id="my_dag",
start_date=datetime(2021, 4, 5),
schedule_interval='@once',
) as dag:
p = PythonOperator(
task_id='fail_task',
python_callable=lambda x: 1,
)
t = PythonOperator(
task_id='run_task',
python_callable=lambda: 1,
trigger_rule=TriggerRule.ALL_FAILED
)
p >> t
【讨论】:
当前发布的答案涉及不同的主题或似乎不完全正确。
将触发规则 all_failed 添加到 Task-C 将不适用于 OP 的示例 DAG:A >> B >> C,除非 Task-A 以 failed 状态结束,这很可能是不可取的。
实际上,OP 非常接近,因为可以通过混合使用 AirflowSkipException 和 none_failed 触发规则来实现预期的行为:
from datetime import datetime
from airflow.exceptions import AirflowSkipException
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
with DAG(
dag_id="mydag",
start_date=datetime(2022, 1, 18),
schedule_interval="@once"
) as dag:
def task_b():
raise AirflowSkipException
A = DummyOperator(task_id="A")
B = PythonOperator(task_id="B", python_callable=task_b)
C = DummyOperator(task_id="C", trigger_rule="none_failed")
A >> B >> C
哪个Airflow执行如下:
这条规则是什么意思?
触发规则
none_failed:所有上游任务都没有失败或upstream_failed - 即所有上游任务都已成功或被跳过
所以基本上我们可以在我们的代码中捕获实际异常并引发提到的“强制”任务状态从 failed 更改为 skipped 的 Airflow 异常。
但是,如果没有 Task-C 的 trigger_rule 参数,我们最终会将 Task-B 下游标记为 skipped。
【讨论】:
您可以在任务声明中更改trigger_rule。
task = BashOperator(
task_id="task_C",
bash_command="echo hello world",
trigger_rule="all_done",
dag=dag
)
【讨论】: