【发布时间】:2021-06-02 15:52:33
【问题描述】:
我正在努力将阻塞顺序编排框架转换为反应式。目前,这些任务是动态的,并通过 JSON 输入输入引擎。引擎拉取类并执行run() 方法,并将状态与每个任务的响应一起保存。
如何在反应器中实现相同的链接?如果这是一个静态 DAG,我会用 flatMap 或 then 运算符链接它,但由于它是动态的,我该如何继续执行响应式任务并收集每个任务的输出?
示例:
非反应式接口:
public interface OrchestrationTask {
OrchestrationContext run(IngestionContext ctx);
}
核心引擎
public Status executeDAG(String id) {
IngestionContext ctx = ContextBuilder.getCtx(id);
List<OrchestrationTask> tasks = app.getEligibleTasks(id);
for(OrchestrationTask task : tasks) {
// Eligible tasks are executed sequentially and results are collected.
OrchestrationContext stepContext = task.run(ctx);
if(!evaluateResult(stepContext)) break;
}
return Status.SUCCESS;
}
按照上面的示例,如果我将任务转换为返回 Mono> 那么,我如何等待或链接其他任务以对先前任务的结果进行操作? 任何帮助表示赞赏。谢谢。
更新::
反应式任务示例。
public class SampleTask implements OrchestrationTask {
@Override
public Mono<OrchestrationContext> run(OrchestrationContext context) {
// Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
return Mono.just(context).delayElements(Duration.ofSeconds(2));
}
所以我将有一系列任务来完成各种事情,但每个任务的响应都依赖于前一个任务并存储在编排上下文中。每当发生错误时,编排上下文标志将设置为 false 并且通量应该停止。
【问题讨论】:
-
你能举一个你的阻塞代码的例子吗?从你的描述我无法完全想象它是如何工作的。
-
@MichaelBerry 更新了帖子。如果您还需要更多信息,请告诉我。谢谢。
-
要明确一点——这里需要什么反应?您提到 task.run() 返回一个单声道,但它是 - 还是 app.getEligibleTasks()、ContextBuilder.getCtx() 和 evaluateResult() 也需要通过返回反应式发布者来避免阻塞操作?此外,该方法目前似乎总是返回
SUCCESS,但只会在每个StepContext上执行evaluateResult(),直到其中一个返回false。这是您需要在响应式实现中模拟的行为吗? -
@michaelberry 其他方法没有阻塞操作。只有 run 会返回 Mono,它会根据 run 中编写的任务返回状态。此方法的状态为成功,但如果 evaluateResult 为 false,它将停止执行下一个任务。
-
@MichaelBerry 有什么想法吗?
标签: spring-boot spring-webflux reactor