【发布时间】:2020-09-01 16:32:34
【问题描述】:
我有一个包含许多并行运行的任务的 DAG。
- 已创建 EMR 集群
- 提交了多个 EMR 步骤,
-
EMRStepSensor配置为每个EmrAddStepsOperator以等待步骤的结果。
如果任何步骤失败,我想发送以发送 SNS 消息。我见过一些方法,比如使用另一个运算符 (SnsPublishOperator) 和一个名为 trigger_rule="all_done" 的属性。我尝试过这样的事情:
def get_sns_operator(self, emr_env):
return SnsPublishOperator(
target_arn=emr_env['snstopic'],
message="Foo",
subject="Report of execution",
task_id="sns_notification",
trigger_rule="all_done",
dag=self.dag
)
def define_workflow(self):
common_args = CommonArgs(emr_env=Variable.get("consolidation", deserialize_json=True).get("emr"), other_args=self.other_args)
cluster_creator = EmrCreateJobFlowOperator(
dag=self.dag,
task_id='create_cluster',
job_flow_overrides=common_args.jobflow_overrides_args(),
aws_conn_id='aws_default',
emr_conn_id='emr_default',
region_name='eu-west-1'
)
tables = Variable.get("consolidation", deserialize_json=True).get(self.other_args['system']).get('tables')
sns_operator = self.get_sns_operator(emr_env=Variable.get("consolidation", deserialize_json=True).get("emr"))
for table in tables:
step_args = copy.copy(self.other_args)
step_args['table'] = table
step_adder = EmrAddStepsOperator(
dag=self.dag,
task_id='step_{table}'.format(table=table),
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
aws_conn_id='aws_default',
steps=[common_args.step_args(**step_args)]
)
step_checker = EmrStepSensor(
dag=self.dag,
task_id='watch_step_{table}'.format(table=table),
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='step_" + table + "', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(sns_operator)
return self.dag
实际上,任务已成功创建。但我想知道如何传递信息状态或从watch_steps... 获取信息状态,一旦它们全部完成并发送消息以防出现错误。
任何帮助将不胜感激。谢谢。
【问题讨论】:
标签: amazon-web-services airflow