【问题标题】:Airflow on_failure_callback & SlackAPIPostOperator气流 on_failure_callback & SlackAPIPostOperator
【发布时间】:2018-07-06 20:21:19
【问题描述】:

尝试使用 on_failure_callback 发布到 Slack。我在下面有这个过程,我的默认 dag args 有

'on_failure_callback' : notify_slack_failure

当我的任务失败时,它会在 /tmp 中创建输出。但我的 Slack 消息没有发布。当我通过命令行对其进行测试时,它会发布到 Slack。对我缺少的东西有什么想法吗?

def notify_slack_failure(context):
   """
   Define the callback to post on Slack if a failure is detected in the Workflow
   :return: operator.execute
   """

   cmd = "echo '" + getSlackToken() +"' > /tmp/a.out"
   os.system("touch /tmp/b.out")
   os.system(cmd)
   text_message='333'
   #text_message=str(context['task_instance'])

   operator = SlackAPIPostOperator(
      task_id='failure',
      text=text_message,
      token=getSlackToken(),
      channel=SLACK_CHANNEL,
      username=SLACK_USER
   )
   os.system("touch /tmp/e1.out")
   return operator.execute(context=context)

【问题讨论】:

    标签: slack airflow


    【解决方案1】:

    乔治 - 欢迎来到 SO!尝试运行此代码。看看 Airflow 是否能够发布到 Slack,以及写入 tmp 文件的代码是否搞砸了。

    def notify_slack_failure(contextDictionary, **kwargs):  
       """
       Define the callback to post on Slack if a failure is detected in the Workflow
       :return: operator.execute
       """
       text_message='333'
       #text_message=str(context['task_instance'])
    
       operator= SlackAPIPostOperator(
          task_id='failure',
          token=getSlackToken(),
          text=text_message,
          channel=SLACK_CHANNEL,
          username=SLACK_USER,)
          return operator.execute
    

    取自SO Slack Airflow answer

    【讨论】:

    • 非常感谢..但错误是我的。我们通过 http 代理进行出站连接。将其设置在 bash 配置文件中...但忘记为 systemd 服务定义它。第一次上升..和愚蠢的错误。对不起!
    • 乔治不用担心!
    【解决方案2】:

    在命令行中,我定义了我的 http_proxy 变量,但没有为守护进程设置它们。

    【讨论】:

      猜你喜欢
      • 2021-05-04
      • 2017-12-31
      • 1970-01-01
      • 1970-01-01
      • 2021-11-23
      • 1970-01-01
      • 2021-04-01
      • 2020-12-06
      • 1970-01-01
      相关资源
      最近更新 更多