【问题标题】:Spring Integration aggregator only releases one group coming from AMQP backed channelSpring Integration 聚合器仅发布一组来自 AMQP 支持的通道
【发布时间】:2020-06-27 19:30:04
【问题描述】:

我的 Spring Boot 应用程序出现问题,我的聚合器中只处理了一个组,然后应用程序停止使用队列中的更多消息。它似乎只在启动时处理一个组。我重新启动了应用程序,它处理了另一个组,但它又停止了。

这是我下面的流程。

return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, importQueueName).errorChannel(errorChannel))
                .split(userImportSplitter)
                .channel(Amqp.channel(connectionFactory)
                        .queueName(USER_QUEUE_NAME)
                        .prefetchCount(batchSize))
                .aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(500))
                .handle(userImporter)
                .get();

【问题讨论】:

    标签: spring-boot spring-integration spring-amqp spring-integration-dsl


    【解决方案1】:

    可能您的userImportSplitter 产生相同的conrrelationId,因此在聚合器中只形成一个组,默认情况下它不会在释放或超时后删除。

    考虑使用这些选项:

    .aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(500)
                        .expireGroupsUponCompletion(true)
                        .expireGroupsUponTimeout(true))
    

    【讨论】:

    • 还在做同样的事情。
    • 我们可以看看你的userImportSplitter在做什么吗?如果你删除中间的`.channel(Amqp.channel()`,它是如何工作的?
    • 想通了。这是userImporter 引发异常的问题。我想我的新问题是如何防止该消息阻塞队列?
    • ExpressionEvaluatingRequestHandlerAdvice:docs.spring.io/spring-integration/docs/current/reference/html/…。拆分器只是一个循环,如果您在普通的 Java for() 循环中失败,它也会以同样的方式失败。因此,为了防止过早退出循环,我们需要 try..catch 那个内部异常并对失败的项目做一些事情。
    最近更新 更多