【问题标题】:airflow trigger_rule using ONE_FAILED cause dag failure使用 ONE_FAILED 的气流 trigger_rule 导致 dag 失败
【发布时间】:2019-08-24 06:41:17
【问题描述】:

我想要实现的是创建一个任务,如果 dag 下的任何任务失败,将在该任务中发送通知。我将触发规则应用于以下任务:

batch11 = BashOperator(
task_id='Error_Buzz',
trigger_rule=TriggerRule.ONE_FAILED,
bash_command='python /home/admin/pythonwork/home/codes/notifications/dagLevel_Notification.py') ,
dag=dag,
catchup = False
)

batch>>batch11
batch1>>batch11

现在的问题是当没有其他任务失败时,由于 trigger_rule 的原因,batch11 任务将不会执行,这是我想要的,但由于 dag 的默认 trigger_rule 是 ALL_SUCCESS,它会导致 dag 失败。有没有办法结束loophole让dag运行成功?

结果截图:

【问题讨论】:

  • trigger_rule 应该将下游任务设置为在不满足任何条件时跳过 - 这确实允许 DAG 本身成功。您能否更新一下您的 dag_run 的结果截图?
  • @andscoop 添加了关于问题的截图

标签: airflow


【解决方案1】:

我们在 Airflow 部署中做了类似的事情。这个想法是在 dag 中的任务失败时通知 slack。您可以设置一个 dag 级别配置 on_failure_callback 记录在 https://airflow.apache.org/code.html#airflow.models.BaseOperator

on_failure_callback (callable) – 任务时调用的函数 此任务的实例失败。上下文字典作为 此函数的单个参数。上下文包含对 与任务实例相关的对象,并记录在 API 的宏部分。

这是我如何使用它的示例。如果任何任务失败或成功气流调用通知功能,我可以在任何我想要的地方得到通知。

import sys
import os

from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from airflow.utils.dates import days_ago
from util.airflow_utils import AirflowUtils

schedule = timedelta(minutes=5)

args = {
  'owner': 'user',
  'start_date': days_ago(1),
  'depends_on_past': False,
  'on_failure_callback': AirflowUtils.notify_job_failure,
  'on_success_callback': AirflowUtils.notify_job_success
}

dag = DAG(
  dag_id='demo_dag',
  schedule_interval=schedule, default_args=args)

def task1():
  return 'Whatever you return gets printed in the logs!'

def task2():
  return 'cont'

task1 = PythonOperator(task_id='task1',
                       python_callable=task1,
                       dag=dag)

task2 = PythonOperator(task_id='task2',
                       python_callable=task1,
                       dag=dag)

task1 >> task2

【讨论】:

  • 嗨,我尝试像你一样在默认参数中使用 on_failure_callback。您是否遇到过此后不断发送通知的问题? dag 在实现 on_failure_callback 之前多次失败,那么那些失败的 dag-run 会导致问题吗?
  • 每次失败发送一个通知。您可以修改 dag id 以在 db 中创建一个新的 dag。
猜你喜欢
  • 2020-02-21
  • 1970-01-01
  • 2021-11-12
  • 2021-01-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-10
  • 2022-12-29
相关资源
最近更新 更多