【问题标题】:Airflow "none_failed" skipping when upstream skips上游跳过时气流“none_failed”跳过
【发布时间】:2020-01-08 09:26:02
【问题描述】:

我有一个工作流程,其中有两个并行进程(sentinel_runsentinel_skip),它们应该根据条件运行或跳过,然后连接在一起(resolve)。我需要直接在 sentinel_ 任务下游的任务进行级联跳过,但是当它到达 resolve 任务时,resolve 应该运行,除非上游任一进程出现故障。

基于documentation,“none_failed”触发规则应该起作用:

none_failed:所有父母都没有失败(失败或upstream_failed),即所有父母都成功或被跳过

这也是对related question 的回复。

但是,当我实现一个简单的示例时,我看到的不是这样:

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils.dates import days_ago

dag = DAG(
    "testing",
    catchup=False,
    schedule_interval="30 12 * * *",
    default_args={
        "owner": "test@gmail.com",
        "start_date": days_ago(1),
        "catchup": False,
        "retries": 0
    }
)

start = DummyOperator(task_id="start", dag=dag)

sentinel_run = ShortCircuitOperator(task_id="sentinel_run", dag=dag, python_callable=lambda: True)
sentinel_skip = ShortCircuitOperator(task_id="sentinel_skip", dag=dag, python_callable=lambda: False)

a = DummyOperator(task_id="a", dag=dag)
b = DummyOperator(task_id="b", dag=dag)
c = DummyOperator(task_id="c", dag=dag)
d = DummyOperator(task_id="d", dag=dag)
e = DummyOperator(task_id="e", dag=dag)
f = DummyOperator(task_id="f", dag=dag)
g = DummyOperator(task_id="g", dag=dag)

resolve = DummyOperator(task_id="resolve", dag=dag, trigger_rule="none_failed")

start >> sentinel_run >> a >> b >> c >> resolve
start >> sentinel_skip >> d >> e >> f >> resolve

resolve >> g

此代码创建以下 dag:

问题是应该执行 resolved 任务(因为上游没有 upstream_failedfailed),但它正在跳过。

我已经自省了数据库,没有隐藏任何失败或上游失败的任务,我不明白为什么它不遵守“none_failed”逻辑。

我知道"ugly workaround" 并已在其他工作流中实现它,但它增加了另一个要执行的任务,并增加了 DAG 的新用户必须了解的复杂性(尤其是当您将其乘以多个任务时.. .)。这是我从 Airflow 1.8 升级到 Airflow 1.10 的主要原因,所以我希望我缺少一些明显的东西......

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    记录这一点,因为这个问题困扰了我两次,现在我已经解决了两次。

    问题分析

    当您将日志级别设置为 DEBUG 时,您开始看到发生了什么:

    [2019-10-09 18:30:05,472] {python_operator.py:114} INFO - Done. Returned value was: False
    [2019-10-09 18:30:05,472] {python_operator.py:159} INFO - Condition result is False
    [2019-10-09 18:30:05,472] {python_operator.py:165} INFO - Skipping downstream tasks...
    [2019-10-09 18:30:05,472] {python_operator.py:168} DEBUG - Downstream task_ids [<Task(DummyOperator): f>, <Task(DummyOperator): g>, <Task(DummyOperator): d>, <Task(DummyOperator): resolve>, <Task(DummyOperator): e>]
    [2019-10-09 18:30:05,492] {python_operator.py:173} INFO - Done.
    

    由此可以看出问题不是“none_failed”处理任务不正确,而是模拟跳过条件的哨兵标记all 直接跳过下游依赖项。 这是 ShortCircuitOperator 的一种行为 - 跳过所有下游,包括下游任务的下游任务。

    解决方案

    解决这个问题的方法在于认识到是 ShortCircuitOperator 的行为,而不是 TriggerRule 的行为导致了问题。一旦我们意识到这一点,就该着手编写一个更适合我们实际尝试完成的任务的运算符了。

    我已经包含了我当前使用的运算符;我欢迎任何有关处理单个下游任务修改的更好方法的意见。我确信“跳过下一个,让其余的根据他们的触发规则级联”有一个更好的习语,但我已经花了比我想要的更多的时间,我怀疑答案在更深层次内部结构。

    """Sentinel Operator Plugin"""
    
    import datetime
    
    from airflow import settings
    from airflow.models import SkipMixin, TaskInstance
    from airflow.operators.python_operator import PythonOperator
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils.state import State
    
    
    class SentinelOperator(PythonOperator, SkipMixin):
        """
        Allows a workflow to continue only if a condition is met. Otherwise, the
        workflow skips cascading downstream to the next time a viable task
        is identified.
    
        The SentinelOperator is derived from the PythonOperator. It evaluates a
        condition and stops the workflow if the condition is False. Immediate
        downstream tasks are skipped. If the condition is True, downstream tasks
        proceed as normal.
    
        The condition is determined by the result of `python_callable`.
        """
        def execute(self, context):
            condition = super(SentinelOperator, self).execute(context)
            self.log.info("Condition result is %s", condition)
    
            if condition:
                self.log.info('Proceeding with downstream tasks...')
                return
    
            self.log.info('Skipping downstream tasks...')
    
            session = settings.Session()
    
            for task in context['task'].downstream_list:
                ti = TaskInstance(task, execution_date=context['ti'].execution_date)
                self.log.info('Skipping task: %s', ti.task_id)
                ti.state = State.SKIPPED
                ti.start_date = datetime.datetime.now()
                ti.end_date = datetime.datetime.now()
                session.merge(ti)
    
            session.commit()
            session.close()
    
            self.log.info("Done.")
    
    
    class Plugin_SentinelOperator(AirflowPlugin):
        name = "sentinel_operator"
        operators = [SentinelOperator]
    

    通过修改,这将产生预期的 dag 结果:

    【讨论】:

    • 不错的实现,想知道您是否考虑过一种方法来标记仅通过一个条件运算符跳过的所有下游任务?如果我没记错的话,d、e、f 都得使用 sentinelOperator。
    • 谢谢!我确实考虑过 - 我认为您所描述的实际上类似于 ShortCircuitOperator 的默认行为。它当前的设置方式,当e 检查d 的状态时,all_success 触发规则将e 设置为在跳过d 时跳过。这让我可以通过触发规则控制流程。我确实使用分支选择做了类似于您在另一个项目中描述的事情。我附上了 sn-p,但如果您想了解更多详细信息,请告诉我:gitlab.com/snippets/1910134
    • 好吧,不知怎的,我错过了短路操作符。哇-多么疏忽。这就是我要找的。您的操作员几乎肯定会在其他情况下派上用场。谢谢!
    • 我们应该用 pendulum.now() 代替 datetime.now()
    • 稍微脏​​一点的选项,以防您不希望按照我在此处的回答使用自定义运算符:stackoverflow.com/a/63270072/1683314
    【解决方案2】:

    这似乎是 Airflow 中的一个错误。如果您想修复它,请将您的声音添加到https://issues.apache.org/jira/browse/AIRFLOW-4453

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-01-05
      • 1970-01-01
      • 2019-08-05
      • 1970-01-01
      • 1970-01-01
      • 2020-02-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多