【问题标题】:Can a flow be terminated if a specific task fails?如果特定任务失败,是否可以终止流程?
【发布时间】:2020-10-06 05:06:47
【问题描述】:

我的流程很长,如果特定任务失败,我想将其状态标记为Failed

我阅读了文档here,但是他们似乎没有指定这种情况。 我的猜测是我需要在 @task() 装饰器中以某种方式具体说明这一点,但我不知道如何。

非常感谢任何指导。

【问题讨论】:

    标签: etl prefect


    【解决方案1】:

    这可以通过将您的特定任务声明为 Flow 的“参考任务”来实现。 Reference tasks 是最终确定每个流运行的最终状态的任务。

    例如,以下代码 sn-p 创建一个流程,其中包含两个独立的、不相关的任务,这些任务随机失败一半。但是,整体流运行状态将仅根据task_one 的状态来确定,因为我们将该任务指定为唯一的参考任务:

    import random
    from prefect import task, Flow
    
    
    @task
    def task_one():
        if random.random() > 0.5:
            raise ValueError("Random failure")
    
    @task
    def task_two():
        if random.random() > 0.5:
            raise ValueError("Random failure")
    
    
    flow = Flow("Two Task Flow", tasks=[task_one, task_two])
    flow.set_reference_tasks([task_one])
    

    【讨论】:

    • 嗨@chriswhite,所以这个答案决定了流状态。但是,如果这些任务在一个大 DAG 中相关,那么如果我在流程中间设置了一个参考任务并且该任务失败了,这会停止流程吗?
    • 不幸的是,目前还没有,尽管我可以看到那个用例。如果您使用的是 API 后端,则可以创建一个任务状态处理程序,如果引用任务失败,该处理程序会调用 cancel_flow_run,这将实现类似(尽管不相同)的行为
    • 非常感谢。我会将其作为功能请求提交。
    最近更新 更多