【发布时间】:2021-11-26 05:34:45
【问题描述】:
我一直在尝试在集合中添加timeout,以免等待每个流程都完成。 但是当我添加超时不起作用时,因为聚合器等待每个流完成。
@Bean
public IntegrationFlow queueFlow(LogicService service) {
return f -> f.scatterGather(scatterer -> scatterer
.applySequence(true)
.recipientFlow(aFlow(service))
.recipientFlow(bFlow(service))
, aggregatorSpec -> aggregatorSpec.groupTimeout(2000L))
例如,我的流程中,其中一个延迟 2 秒,另一个延迟 4 秒
public IntegrationFlow bFlow(LogicService service) {
return IntegrationFlows.from(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(service::callFakeServiceTimeout2)
.transform((MessageDomain.class), message -> {
message.setMessage(message.getMessage().toUpperCase());
return message;
}).get();
}
我使用 Executors.newCachedThreadPool() 来并行运行。 我想释放在超时完成之前包含的每条消息
我一直在测试的另一种方法是使用默认收集器并在 scatterGather 中设置gatherTimeout,但我不知道我是否遗漏了什么 Approach gatherTimeout
更新
cmets 中给出的所有方法都经过测试并且工作正常,唯一的问题是每个操作都是在创建消息组时评估的。并且在第一条消息到达之前创建消息组。理想的方法是在 scatterer 分发请求消息时选择有效。
我的临时解决方案是使用临时发布策略,应用 GroupConditionProvider 读取我在通过网关发送消息时创建的自定义标头。唯一需要担心的是,只有在收到新消息或我设置了组超时时才会执行发布策略。
【问题讨论】:
标签: spring spring-boot spring-integration spring-integration-dsl