【问题标题】:Optimise Consuming messages from rabbitmq using Spring Integration使用 Spring Integration 优化来自 rabbitmq 的消费消息
【发布时间】:2017-11-02 13:15:33
【问题描述】:

我正在尝试构建一个 IntegrationFlowFactory 来轻松构建集成流,以便在应用程序上下文之间传递事件。

似乎一切正常,而且活动发布得非常快。

但是我无法弄清楚为什么消费如此缓慢。添加 concurrentConsumers 或更改 prefetchCount 似乎没有任何改变。

其他帖子谈到网络速度很慢,但正如您在 RabbitConfig 中看到的那样,我使用的是 localhost。

我在这里有一个带有我的 spring 集成示例的存储库: https://github.com/teplyuska/spring-integration-example

【问题讨论】:

  • 如果增加预取“似乎没有改变任何东西”,那么它可能是监听器代码中的问题;是时候分析你的应用了。从 10 到 100 可能没有什么不同,但从 1 到 10 肯定应该(如果侦听器是轻量级的)。
  • 把你的样本拉到本地玩一下......
  • 添加更多的并发消费者会提高总吞吐量,但每个消费者的吞吐量仍然很低。
  • 查看我的答案以获得解决方案。

标签: spring-boot rabbitmq spring-integration spring-integration-dsl


【解决方案1】:

你的问题在这里:

Amqp.inboundGateway(getListenerContainer(queue, concurrentConsumers, prefetchCount)

同时,您的下游流程是单向并且不返回任何回复:

.handle(p -> {
                UpdateSecretEvent payload = (UpdateSecretEvent) p.getPayload();
                System.out.println("Account: " + payload.getAccountId() + " has secret: " + payload.getNewSecret());
 })
.get();

.handle(p -> {
                UpdateEmailEvent payload = (UpdateEmailEvent) p.getPayload();
                System.out.println("Account: " + payload.getAccountId() + " has email: " + payload.getEmail());
})
.get();

所以,AmqpInboundGateway 在其MessagingTemplate.sendAndReceive() 中等待private static final long DEFAULT_TIMEOUT = 1000L; 的回复

切换到Amqp.inboundAdapter() 就可以了。

【讨论】:

    猜你喜欢
    • 2023-03-29
    • 2020-04-20
    • 1970-01-01
    • 1970-01-01
    • 2017-06-28
    • 1970-01-01
    • 2020-04-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多