【问题标题】:Use operator for failed tasks Airflow使用运算符处理失败的任务
【发布时间】:2020-09-01 16:32:34
【问题描述】:

我有一个包含许多并行运行的任务的 DAG。

  • 已创建 EMR 集群
  • 提交了多个 EMR 步骤,
  • EMRStepSensor 配置为每个 EmrAddStepsOperator 以等待步骤的结果。

如果任何步骤失败,我想发送以发送 SNS 消息。我见过一些方法,比如使用另一个运算符 (SnsPublishOperator) 和一个名为 trigger_rule="all_done" 的属性。我尝试过这样的事情:

def get_sns_operator(self, emr_env):
    return SnsPublishOperator(
        target_arn=emr_env['snstopic'],
        message="Foo",
        subject="Report of execution",
        task_id="sns_notification",
        trigger_rule="all_done",
        dag=self.dag
    )


def define_workflow(self):
    common_args = CommonArgs(emr_env=Variable.get("consolidation", deserialize_json=True).get("emr"), other_args=self.other_args)
    cluster_creator = EmrCreateJobFlowOperator(
        dag=self.dag,
        task_id='create_cluster',
        job_flow_overrides=common_args.jobflow_overrides_args(),
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
        region_name='eu-west-1'
    )
    tables = Variable.get("consolidation", deserialize_json=True).get(self.other_args['system']).get('tables')
    sns_operator = self.get_sns_operator(emr_env=Variable.get("consolidation", deserialize_json=True).get("emr"))
    for table in tables:
        step_args = copy.copy(self.other_args)
        step_args['table'] = table
        step_adder = EmrAddStepsOperator(
            dag=self.dag,
            task_id='step_{table}'.format(table=table),
            job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
            aws_conn_id='aws_default',
            steps=[common_args.step_args(**step_args)]
        )

        step_checker = EmrStepSensor(
            dag=self.dag,
            task_id='watch_step_{table}'.format(table=table),
            job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
            step_id="{{ task_instance.xcom_pull(task_ids='step_" + table + "', key='return_value')[0] }}",
            aws_conn_id='aws_default',
        )
        cluster_creator.set_downstream(step_adder)
        step_adder.set_downstream(step_checker)
        step_checker.set_downstream(sns_operator)
    return self.dag

实际上,任务已成功创建。但我想知道如何传递信息状态或从watch_steps... 获取信息状态,一旦它们全部完成并发送消息以防出现错误。

任何帮助将不胜感激。谢谢。

【问题讨论】:

    标签: amazon-web-services airflow


    【解决方案1】:

    听起来您实际上想要 one_failedtrigger_rule 值 - 这样您只会在任何上游任务失败并且您不需要内省任何状态时收到通知:接收通知,有些东西需要修复。

    您可以在此处阅读有关各种触发规则的更多信息:https://airflow.apache.org/docs/stable/concepts.html#trigger-rules

    【讨论】:

    • 但我想要所有任务完成后的报告。
    • 然后有第二个任务,它执行相同的操作,但使用默认的 trigger_rule 等待所有上游成功完成。
    猜你喜欢
    • 2017-05-12
    • 2021-01-23
    • 1970-01-01
    • 1970-01-01
    • 2019-12-24
    • 2012-02-28
    • 2019-04-29
    • 1970-01-01
    • 2015-05-02
    相关资源
    最近更新 更多