【发布时间】:2022-03-01 23:37:28
【问题描述】:
我正在处理的工作流程中的一个要求是等待某个事件在给定时间内发生,如果没有发生,则将该任务标记为失败,但仍应执行下游任务。
我想知道“all_done”是否意味着所有依赖任务都完成了,无论它们是否成功。
【问题讨论】:
标签: airflow
我正在处理的工作流程中的一个要求是等待某个事件在给定时间内发生,如果没有发生,则将该任务标记为失败,但仍应执行下游任务。
我想知道“all_done”是否意味着所有依赖任务都完成了,无论它们是否成功。
【问题讨论】:
标签: airflow
https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-trigger-rules
all_done 表示所有操作都已完成。也许他们成功了,也许没有。
all_success 表示所有操作都已完成且没有错误
所以你的猜测是正确的
【讨论】:
总结
如果 SUCCESS、FAILED、UPSTREAM_FAILED、SKIPPED 任务的计数大于或等于所有上游任务的计数,则任务“全部完成”。
不知道为什么会大于?也许 subdags 对计数做了一些奇怪的事情。
如果上游任务的计数和成功上游任务的计数相同,则任务“全部成功”。
详情
评估触发规则的代码在这里https://github.com/apache/incubator-airflow/blob/master/airflow/ti_deps/deps/trigger_rule_dep.py#L72
以下代码运行qry 并将第一行(查询是一个无论如何只会返回一行的聚合)返回到以下变量中:
successes, skipped, failed, upstream_failed, done = qry.first()
查询中的“完成”列对应于:func.count(TI.task_id),换句话说,是所有匹配过滤器的任务的计数。
过滤器指定它只计算上游任务,从当前 dag 开始,从当前执行日期开始,并且:
TI.state.in_([
State.SUCCESS, State.FAILED,
State.UPSTREAM_FAILED, State.SKIPPED])
所以done 是具有这 4 个状态之一的上游任务的计数。
后面有这段代码
upstream = len(task.upstream_task_ids)
...
upstream_done = done >= upstream
而实际的触发规则只在此失败
if not upstream_done
代码相当简单,概念直观
num_failures = upstream - successes
if num_failures > 0:
... it fails
【讨论】:
考虑将ShortCircuitOperator 用于您所说的目的。
【讨论】:
所有操作符都有一个 trigger_rule 参数,它定义了触发生成任务的规则。
我在以下用例中使用了这些触发规则:
all_success:(默认)所有父母都成功
all_done:所有父母都已完成执行。
To carry out cleanups irrespective of the upstream tasks
succeeded or failed then setting this trigger_rule to ALL_DONE is always useful.
one_success:只要至少有一个父母成功就触发,它不会等待所有父母都完成
To trigger external DAG after successful completion of the single upstream parent.
one_failed:只要至少有一个父节点失败就触发,它不会等待所有父节点都完成
To trigger the alerts once at least one parent fails or for any other use case.