【发布时间】:2018-03-04 01:31:55
【问题描述】:
我有一个非常适合队列通道的 dsl 流。但是,当我使用 Rendezvous 通道使其同步时,我得到的确认率最多为 30 条消息/秒。我的处理程序只需 350 微秒即可完成该过程,但确认率一直很低。这大大增加了兔子队列。我什至将并发消费者扩展到 10 个并增加了预取,但这并没有帮助。然后我自己添加了几个更缩放的实例,但这有助于将 ack 速率提高到 45/秒左右。
如何让流确认更快?我预计每秒超过 500 次。
DSL 流:
SimpleMessageListenerContainer simpleMessageListenerContainer = profileTagRabbitMLCConfig.transactedChannelSpanRabbitSMLC(queueName)
simpleMessageListenerContainer?.setConcurrentConsumers(concurrentConsumer)
simpleMessageListenerContainer?.setPrefetchCount(prefetch)
return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer))
.channel(rendezvousTransformerChannel1())
.transform(myTransformer, 'transform', { e -> e.advice(adviceWithRecoverer) })
.channel(rendezvousTransformerChannel2())
.handle(myHandler, 'save', { e -> e.advice(adviceWithRecoverer) })
.get()
同步频道:
@Bean
MessageChannel rendezvousTransformerChannel1() {
return MessageChannels.rendezvous().get()
}
@Bean
MessageChannel rendezvousHandlerChannel() {
return MessageChannels.rendezvous().get()
}
容器:
SimpleMessageListenerContainer
transactedChannelSpanRabbitSMLC(CachingConnectionFactory rabbitConnectionFactory, String queueName){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer()
container.setConnectionFactory(rabbitConnectionFactory)
container.setQueueNames(queueName)
container.setChannelTransacted(true)
container
}
重试恢复建议:
Advice getRetryAdviceWithRecovery() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice()
advice.setRetryTemplate(getRetryTemplate())
advice.recoveryCallback = getRecoveryCallback() // sends message to rabbit exchange
advice
}
轮询:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(100).maxMessagesPerPoll(500L).get();
}
【问题讨论】:
标签: spring-integration spring-amqp spring-dsl