【问题标题】:How to integrate Apache Airflow with slack?如何将 Apache Airflow 与 slack 集成?
【发布时间】:2018-08-28 09:25:06
【问题描述】:

有人可以给我关于如何将 Apache Airflow 连接到 Slack 工作区的分步手册。 我为我的频道创建了 webhook,接下来我应该如何处理它?

亲切的问候

【问题讨论】:

    标签: airflow slack


    【解决方案1】:
    SlackAPIPostOperator(
          task_id='failure',
          token='YOUR_TOKEN',
          text=text_message,
          channel=SLACK_CHANNEL,
          username=SLACK_USER)
    

    以上是您可以使用 Airflow 向 Slack 发送消息的最简单方法。

    但是,如果您想将 Airflow 配置为在任务失败时向 Slack 发送消息,请创建一个函数并将 on_failure_callback 添加到您的任务中,并使用创建的 slack 函数的名称。下面是一个例子:

    def slack_failed_task(contextDictionary, **kwargs):  
           failed_alert = SlackAPIPostOperator(
             task_id='slack_failed',
             channel="#datalabs",
             token="...",
             text = ':red_circle: DAG Failed',
             owner = '_owner',)
             return failed_alert.execute()
    
    
    task_with_failed_slack_alerts = PythonOperator(
        task_id='task0',
        python_callable=<file to execute>,
        on_failure_callback=slack_failed_task,
        provide_context=True,
        dag=dag)
    

    使用 SlackWebHook(仅适用于 Airflow >= 1.10.0): 如果您想使用SlackWebHook,请以类似方式使用SlackWebhookOperator

    https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/slack_webhook_operator.py#L25

    【讨论】:

    • 有没有办法通过 webhook 连接?
    • 如果要使用Webhook,请使用SlackWebhookOperator检查github.com/apache/incubator-airflow/blob/…
    • @kaxil 我不得不将 slack_failed_task() 的最后一行更改为 return failed_alert.execute(),没有括号就没有发送松弛消息。否则,这是一个很好的答案,对我帮助很大。谢谢!
    【解决方案2】:

    试试 Airflow 版本中的新 SlackWebhookOperator>=1.10.0

    from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
    
    slack_msg = "Hi Wssup?"
    
    slack_test =  SlackWebhookOperator(
            task_id='slack_test',
            http_conn_id='slack_connection',
            webhook_token='/1234/abcd',
            message=slack_msg,
            channel='#airflow_updates',
            username='airflow_'+os.environ['ENVIRONMENT'],
            icon_emoji=None,
            link_names=False,
            dag=dag)
    

    注意:确保您已将 slack_connection 添加到您的 Airflow 连接中

    host=https://hooks.slack.com/services/
    

    【讨论】:

    • 这里有什么办法不使用连接吗?我正在使用 AWS Secrets Manager 来存储 webhook_token,我不想同时为它管理另一个气流连接!
    【解决方案3】:

    @kaxil 回答中SlackWebhookOperator 用法的完整示例:

    def slack_failed_task(task_name):
      failed_alert = SlackWebhookOperator(
        task_id='slack_failed_alert',
        http_conn_id='slack_connection',
        webhook_token=Variable.get("slackWebhookToken", default_var=""),
        message='@here DAG Failed {}'.format(task_name),
        channel='#epm-marketing-dev',
        username='Airflow_{}'.format(ENVIRONMENT_SUFFIX),
        icon_emoji=':red_circle:',
        link_names=True,
      )
      return failed_alert.execute
    
    task_with_failed_slack_alerts = PythonOperator(
      task_id='task0',
      python_callable=<file to execute>,
      on_failure_callback=slack_failed_task,
      provide_context=True,
      dag=dag)
    

    作为@Deep Nirmal 注意:确保您在 Airflow 连接中添加了 slack_connection

    host=https://hooks.slack.com/services/
    

    【讨论】:

    • 这个例子不完整。 “在您的 Airflow 连接中添加了 slack_connection”是什么意思?
    • @RaviR 在 Admin -> Connections 下查看 Airflow Web UI 或导航到 http://your-airflow-server:8080/admin/connection/
    猜你喜欢
    • 2018-07-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-13
    相关资源
    最近更新 更多