【发布时间】:2020-10-06 05:06:47
【问题描述】:
我的流程很长,如果特定任务失败,我想将其状态标记为Failed。
我阅读了文档here,但是他们似乎没有指定这种情况。
我的猜测是我需要在 @task() 装饰器中以某种方式具体说明这一点,但我不知道如何。
非常感谢任何指导。
【问题讨论】:
我的流程很长,如果特定任务失败,我想将其状态标记为Failed。
我阅读了文档here,但是他们似乎没有指定这种情况。
我的猜测是我需要在 @task() 装饰器中以某种方式具体说明这一点,但我不知道如何。
非常感谢任何指导。
【问题讨论】:
这可以通过将您的特定任务声明为 Flow 的“参考任务”来实现。 Reference tasks 是最终确定每个流运行的最终状态的任务。
例如,以下代码 sn-p 创建一个流程,其中包含两个独立的、不相关的任务,这些任务随机失败一半。但是,整体流运行状态将仅根据task_one 的状态来确定,因为我们将该任务指定为唯一的参考任务:
import random
from prefect import task, Flow
@task
def task_one():
if random.random() > 0.5:
raise ValueError("Random failure")
@task
def task_two():
if random.random() > 0.5:
raise ValueError("Random failure")
flow = Flow("Two Task Flow", tasks=[task_one, task_two])
flow.set_reference_tasks([task_one])
【讨论】:
cancel_flow_run,这将实现类似(尽管不相同)的行为