【问题标题】:How to poll from a queue 1 message at a time after downstream flow is completed in Spring Integration在 Spring Integration 中完成下游流程后,如何一次从队列 1 消息中轮询
【发布时间】:2020-04-23 06:30:47
【问题描述】:

我目前正在努力提高集成流的性能,以尝试并行化消息处理。我已经全部使用 Java DSL 实现了。

当前的集成流从带有固定轮询器的队列通道中获取消息,并通过多个处理程序一个接一个地处理消息,直到它到达最终处理程序,该处理程序会在考虑到前一个处理程序的每个输出的情况下进行一些最终计算。它们都连接在同一个集成流程中。基本上,这些处理程序包装对外部系统的调用。我需要在这里保留的重要一点是,在前一个消息的所有下游流完成之前,不得从队列中取出消息。我需要并行化的是处理程序。

当前集成流程: 消息队列 -> 轮询器 -> 处理程序 1 -> 处理程序 2 -> 处理程序 X -> 最终处理程序

我尝试在执行以下操作时合并并行性,并且效果很好。

MessageQueue -> 轮询器 -> 拆分器 -> 执行器 -> 带有子流映射到不同处理程序的路由器 -> 聚合器 -> 最终处理程序

我发现这种方法的问题是在前一条消息通过所有下游流之前从队列通道中获取了一条新消息。很清楚为什么,添加 Splitter 和 Executor 会改变消息的处理方式,但问题是不同消息的结果之间可能存在依赖关系。

问题是,我怎样才能从队列通道中一次检索一个消息,就像“暂停”轮询器一样,直到正在处理的消息下降到聚合器之后的最后一个端点?我不知道如何重新排列组件或我还能做些什么来实现这一点。

对不起,我试图寻找答案,但我找不到它...请在此处提供一些指导。 非常感谢


@Blink 这对我有用,可能需要一些重构,我相信它可以写得更优雅。对不起,我不是专家。

基本元素是:

  1. 封装消息系统的接口
  2. 调用网关方法时消息将被路由到的消息通道

    @Bean
    public DirectChannel integrationChannel() {
        return MessageChannels.direct().get();
    }
    
    @MessagingGateway
    interface WrappingGateway {
    
        @Gateway(requestChannel = "integrationChannel")
        TrackingLog executeIntegration(TrackingLog trackingLog);
    
    }
    

TrackingLog 是我用来记录下游流程结果的模型。

基本上我在集成流中调用包装网关,从消息队列中提取消息。

@Autowired
WrappingGateway integrationGateway;

@Bean
public IntegrationFlow createCatalogueChannelFlow() {
    return IntegrationFlows.from(cataloguePriorityChannel())

            // Queue Poller
            .bridge(s -> s.poller(Pollers.fixedRate(1, TimeUnit.SECONDS).maxMessagesPerPoll(1)).autoStartup(true)
                    .id("cataloguePriorityChannelBridge"))

            // Call to Gateway 
            .handle(m -> {
                this.integrationGateway
                        .executeIntegration(((TrackingLog) m.getPayload()));
            })

            .get();
}

@Bean
public IntegrationFlow startCatalogueIntegrationChannelFlow() {
    return IntegrationFlows.from(integrationChannel())

            // Log
            .handle(trackerSupportClient, "logMessagePreExecution")

            // Set TrackingLog in message Header
            .enrichHeaders(e -> e.headerFunction("TRACKING_LOG", m -> {
                return ((TrackingLog) m.getPayload());
            }))
 ....

整个集成有点复杂,它从异步 HTTP 网关、转换器、路由器、mongodb 中的存储等开始。这里的重点是,正如@Artem Bilan 建议我的那样,对网关的调用会阻止线程并防止队列轮询器获取更多消息,直到当前消息被完全处理。

希望对你有所帮助。

【问题讨论】:

    标签: parallel-processing spring-integration


    【解决方案1】:

    这确实是一项有趣的任务……我会与你分享我的想法,你会选择最适合你的。

    1. 我们总是可以将流的一部分包装到应该等待回复的@MessagingGateway 中。而且它的子流程有多异步已经无关紧要了。因此,您可以并行执行这些任务,但网关仍会等待主线程中的回复,阻塞队列中的下一个轮询。您应该确保在子流结束时将某些内容返回到 replyChannel 以解除对主线程的阻塞。在此处查看文档:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/messaging-endpoints.html#gateway

    2. 我们有一个开箱即用的BarrierMessageHandler 组件。这确实是用消息阻塞当前线程,直到某个触发器到达消息所属的相关性。只有这个组件的问题是你需要弄清楚如何为第一条消息释放障碍,因为这条消息将作为下一条消息的触发器。虽然我们可能可以使用一次性路由器绕过第一条消息的障碍......文档在这里:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/message-routing.html#barrier

    3. 我们有一个像MessageSourcePollingTemplate 这样的组件。因此,您可以在需要时调用包裹在 MessageSource lambda 中的 QueueChannel。我现在有点想不出这如何适应流程,但这是如何暂停轮询的另一个想法。请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/core.html#deferred-acks-message-source

    4. 1234563这样,您可以保持一个状态,直到消息被处理,并且每个轮询任务都将跳过,直到您重置该状态。文档:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/messaging-endpoints.html#endpoint-pollingconsumer

    【讨论】:

    • 对于#4,请参阅SimpleActiveIdleMessageSourceAdvice
    • 是的,作为借用想法的样本。它不能用于PollingConsumer,因为它是为MessageSource 设计的。
    • 太棒了!有用!我选择了第一个似乎更实用和简单的解决方案,我很高兴在概念上我也考虑了选项 2 和 4。感谢你们两位抽出宝贵的时间,非常有帮助!
    • @LucasFavaro 您能否提供一个代码示例,您是如何将流程包装在@MessagingGateway 中的?
    • @Blink 我刚刚编辑了原始帖子以向您展示如何操作。
    猜你喜欢
    • 2019-06-29
    • 2014-07-28
    • 1970-01-01
    • 2023-04-10
    • 2014-07-03
    • 2018-12-28
    • 2016-12-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多