【问题标题】:Airflow default on_failure_callback气流默认 on_failure_callback
【发布时间】:2017-12-31 22:04:07
【问题描述】:

在我的 DAG 文件中,我定义了一个 on_failure_callback() 函数来发布 Slack 以防失败。

如果我为我的 DAG 中的每个运算符指定它会很好:on_failure_callback=on_failure_callback()

有没有办法自动化(例如通过 default_args 或通过我的 DAG 对象)向我的所有操作员分派?

【问题讨论】:

  • 有趣的问题,on_failure_callback 是在 BaseOperator 定义的,我能想到的唯一方法是创建自己的运算符并从 BaseOperator 继承,然后将您的 on_failure_callback() 传递到那里。想看看其他人是怎么想的
  • 感谢您的意见,但我没有信心改变像 BaseOperator 这样基本的东西。我更喜欢手动将其添加到每个操作员,但不会错过 BaseOperator 的更新(更少维护)
  • 你遇到这个错误了吗? stackoverflow.com/questions/50227670/…

标签: python operators airflow apache-airflow


【解决方案1】:

我终于找到了办法。

您可以将 on_failure_callback 作为 default_args 传递

class Foo:
  @staticmethod
  def get_default_args():
      """
      Return default args
      :return: default_args
      """

      default_args = {
          'on_failure_callback': Foo.on_failure_callback
      }

      return default_args

  @staticmethod
  def on_failure_callback(context):
     """
     Define the callback to post on Slack if a failure is detected in the Workflow
     :return: operator.execute
     """

     operator = SlackAPIPostOperator(
         task_id='failure',
         text=str(context['task_instance']),
         token=Variable.get("slack_access_token"),
         channel=Variable.get("slack_channel")
     )

     return operator.execute(context=context) 

【讨论】:

  • 这个怎么用?这是 DAG 类的子类吗?
【解决方案2】:

这里的答案较晚,但是您可以在 DAG 的默认值中指定 on_failure_callback。您只需要编写一个自定义函数,确保它可以在上下文中使用。示例:

def failure_callback(context):
    message = [
        ":red_circle: Task failed",
        f"*Dag*: {context['dag_run'].dag_id}",
        f"*Run*: {context['dag_run'].run_id}",
        f"*Task*: <{context.get('task_instance').log_url}|*{context.get('task_instance').task_id}* failed for execution {context.get('execution_date')}>",
    ]

    # Replace this return with whatever you want
    # I usually send a Slack notification here
    return "\n".join(message)


with DAG(
    ...
    default_args={
        ...
        "on_failure_callback": failure_callback,
    },
) as dag:
    ...

【讨论】:

    猜你喜欢
    • 2021-05-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-23
    • 2017-09-10
    相关资源
    最近更新 更多