【发布时间】:2020-01-08 09:26:02
【问题描述】:
我有一个工作流程,其中有两个并行进程(sentinel_run 和 sentinel_skip),它们应该根据条件运行或跳过,然后连接在一起(resolve)。我需要直接在 sentinel_ 任务下游的任务进行级联跳过,但是当它到达 resolve 任务时,resolve 应该运行,除非上游任一进程出现故障。
基于documentation,“none_failed”触发规则应该起作用:
none_failed:所有父母都没有失败(失败或upstream_failed),即所有父母都成功或被跳过
这也是对related question 的回复。
但是,当我实现一个简单的示例时,我看到的不是这样:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils.dates import days_ago
dag = DAG(
"testing",
catchup=False,
schedule_interval="30 12 * * *",
default_args={
"owner": "test@gmail.com",
"start_date": days_ago(1),
"catchup": False,
"retries": 0
}
)
start = DummyOperator(task_id="start", dag=dag)
sentinel_run = ShortCircuitOperator(task_id="sentinel_run", dag=dag, python_callable=lambda: True)
sentinel_skip = ShortCircuitOperator(task_id="sentinel_skip", dag=dag, python_callable=lambda: False)
a = DummyOperator(task_id="a", dag=dag)
b = DummyOperator(task_id="b", dag=dag)
c = DummyOperator(task_id="c", dag=dag)
d = DummyOperator(task_id="d", dag=dag)
e = DummyOperator(task_id="e", dag=dag)
f = DummyOperator(task_id="f", dag=dag)
g = DummyOperator(task_id="g", dag=dag)
resolve = DummyOperator(task_id="resolve", dag=dag, trigger_rule="none_failed")
start >> sentinel_run >> a >> b >> c >> resolve
start >> sentinel_skip >> d >> e >> f >> resolve
resolve >> g
此代码创建以下 dag:
问题是应该执行 resolved 任务(因为上游没有 upstream_failed 或 failed),但它正在跳过。
我已经自省了数据库,没有隐藏任何失败或上游失败的任务,我不明白为什么它不遵守“none_failed”逻辑。
我知道"ugly workaround" 并已在其他工作流中实现它,但它增加了另一个要执行的任务,并增加了 DAG 的新用户必须了解的复杂性(尤其是当您将其乘以多个任务时.. .)。这是我从 Airflow 1.8 升级到 Airflow 1.10 的主要原因,所以我希望我缺少一些明显的东西......
【问题讨论】: