【问题标题】:How to avoid running the rest of a dagster pipeline under certain conditions如何避免在某些条件下运行 dagster 管道的其余部分
【发布时间】:2020-09-13 10:15:07
【问题描述】:

假设我在 Dagster 中有两个实体连接在管道上。第一个实体可能会执行一些处理并生成有效输入,以便管道的其余部分执行,或者生成不应进一步处理的无效输入。为了达到这个结果,我在数据满足无效条件时引发错误,因此管道停止并跳过其余的实体。

提出一个错误来解决我的用例似乎很棘手,有没有一种方法可以让我跳过管道其余部分的执行而不诉诸异常?

from dagster import solid, pipeline

@solid
def solid_1(context, x: int):
    y = x + 1

    if y%2 == 0:
        raise "No even number is further processed"

    return y

@solid
def solid_2(context, y:int):
    return y**2

@pipeline
def toy_pipeline():
    solid_2(solid_1())

在这个非常人为的示例中,实体 2 仅应在第一个实体的输出为奇数时执行。

在我的实际用例中,第一个实体轮询数据库,有时找不到要处理的数据。在这种情况下,不要将执行标记为失败,而是标记为成功。可以在每个下游实体中检查数据是否满足条件,但这很快就会增加样板。当接收数据的实体找不到要处理的数据时,最好有一种方法跳过所有下游实体的执行。

【问题讨论】:

    标签: dagster


    【解决方案1】:

    为了实现您想要的行为,可以使用相应OutputDefinition 上的is_required=False 参数将输出标记为可选。这意味着输出不一定必须由实体产生。

    如果未产生可选输出,则依赖于输出的所有下游固体将简单地跳过。这对于短路管道(这是您的用例)或更复杂的分支逻辑都很有用。跳过实体时,管道运行不会标记为失败。

    您使用类型提示来定义输入和输出类型,但由于您需要指定is_required 参数,因此您需要使用显式OuputDefinition

    from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, OutputDefinition, Output
    from typing import List
    
    def query_db():
        return []
    
    @solid(output_defs=[OutputDefinition(List[int], 'data', is_required=False)])
    def solid_1(context):
        rows = query_db()
    
        if len(rows) > 0:
            yield Output(rows, output_name="data")
    
    
    @solid
    def solid_2(context, data: List[int]):
        context.log.info(str(data))
        pass
    
    
    @pipeline
    def my_pipeline():
        solid_2(solid_1())
    

    也可以使用InputDefinition 代替类型提示来定义实体solid_2。类型提示是InputDefinitions 的语法糖:

    @solid(input_defs=[InputDefinition('data', List[int])])
    def solid_2(context, data):
        context.log.info(str(data))
        # Process data
        pass
    

    附带说明:通常,异常是将实体标记为失败的正确方法,并且在 Dagster 代码中不被视为 hacky。

    【讨论】:

    • 完美!,我想在这种情况下,输出的产量必须是明确的,对吗?此外,第一个实体的函数不会有返回值。我也同意标记为失败是有道理的。然而,在我们的用例中,我们希望监控在计划中运行的许多管道是否存在可能的错误,并且当我们查看 dagit 中的管道执行记录时,在预期条件下引发异常会将具有真正意外错误的管道与这些预期情况混为一谈。跨度>
    • 是的,你是对的,你需要一个明确的 yieldOutput,而不是仅仅返回。如果您刚刚返回,您会遇到类型检查错误,因为您将隐式返回None,但实体应返回整数列表。对于您的用例,您也是对的,您不想引发异常,因为预计没有行。我会推荐这种可选的输出方法。这样,当没有要处理的数据时,您的运行将被标记为成功。
    猜你喜欢
    • 2019-03-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-09
    • 2021-11-05
    相关资源
    最近更新 更多