【发布时间】:2020-04-23 06:30:47
【问题描述】:
我目前正在努力提高集成流的性能,以尝试并行化消息处理。我已经全部使用 Java DSL 实现了。
当前的集成流从带有固定轮询器的队列通道中获取消息,并通过多个处理程序一个接一个地处理消息,直到它到达最终处理程序,该处理程序会在考虑到前一个处理程序的每个输出的情况下进行一些最终计算。它们都连接在同一个集成流程中。基本上,这些处理程序包装对外部系统的调用。我需要在这里保留的重要一点是,在前一个消息的所有下游流完成之前,不得从队列中取出消息。我需要并行化的是处理程序。
当前集成流程: 消息队列 -> 轮询器 -> 处理程序 1 -> 处理程序 2 -> 处理程序 X -> 最终处理程序
我尝试在执行以下操作时合并并行性,并且效果很好。
MessageQueue -> 轮询器 -> 拆分器 -> 执行器 -> 带有子流映射到不同处理程序的路由器 -> 聚合器 -> 最终处理程序
我发现这种方法的问题是在前一条消息通过所有下游流之前从队列通道中获取了一条新消息。很清楚为什么,添加 Splitter 和 Executor 会改变消息的处理方式,但问题是不同消息的结果之间可能存在依赖关系。
问题是,我怎样才能从队列通道中一次检索一个消息,就像“暂停”轮询器一样,直到正在处理的消息下降到聚合器之后的最后一个端点?我不知道如何重新排列组件或我还能做些什么来实现这一点。
对不起,我试图寻找答案,但我找不到它...请在此处提供一些指导。 非常感谢
@Blink 这对我有用,可能需要一些重构,我相信它可以写得更优雅。对不起,我不是专家。
基本元素是:
- 封装消息系统的接口
-
调用网关方法时消息将被路由到的消息通道
@Bean public DirectChannel integrationChannel() { return MessageChannels.direct().get(); } @MessagingGateway interface WrappingGateway { @Gateway(requestChannel = "integrationChannel") TrackingLog executeIntegration(TrackingLog trackingLog); }
TrackingLog 是我用来记录下游流程结果的模型。
基本上我在集成流中调用包装网关,从消息队列中提取消息。
@Autowired
WrappingGateway integrationGateway;
@Bean
public IntegrationFlow createCatalogueChannelFlow() {
return IntegrationFlows.from(cataloguePriorityChannel())
// Queue Poller
.bridge(s -> s.poller(Pollers.fixedRate(1, TimeUnit.SECONDS).maxMessagesPerPoll(1)).autoStartup(true)
.id("cataloguePriorityChannelBridge"))
// Call to Gateway
.handle(m -> {
this.integrationGateway
.executeIntegration(((TrackingLog) m.getPayload()));
})
.get();
}
@Bean
public IntegrationFlow startCatalogueIntegrationChannelFlow() {
return IntegrationFlows.from(integrationChannel())
// Log
.handle(trackerSupportClient, "logMessagePreExecution")
// Set TrackingLog in message Header
.enrichHeaders(e -> e.headerFunction("TRACKING_LOG", m -> {
return ((TrackingLog) m.getPayload());
}))
....
整个集成有点复杂,它从异步 HTTP 网关、转换器、路由器、mongodb 中的存储等开始。这里的重点是,正如@Artem Bilan 建议我的那样,对网关的调用会阻止线程并防止队列轮询器获取更多消息,直到当前消息被完全处理。
希望对你有所帮助。
【问题讨论】:
标签: parallel-processing spring-integration