【问题标题】:Spring Integration Rendezous channel acknowledgement slowSpring Integration Rendezous 通道确认慢
【发布时间】:2018-03-04 01:31:55
【问题描述】:

我有一个非常适合队列通道的 dsl 流。但是,当我使用 Rendezvous 通道使其同步时,我得到的确认率最多为 30 条消息/秒。我的处理程序只需 350 微秒即可完成该过程,但确认率一直很低。这大大增加了兔子队列。我什至将并发消费者扩展到 10 个并增加了预取,但这并没有帮助。然后我自己添加了几个更缩放的实例,但这有助于将 ack 速率提高到 45/秒左右。

如何让流确认更快?我预计每秒超过 500 次。

DSL 流:

SimpleMessageListenerContainer simpleMessageListenerContainer = profileTagRabbitMLCConfig.transactedChannelSpanRabbitSMLC(queueName)

simpleMessageListenerContainer?.setConcurrentConsumers(concurrentConsumer)
            simpleMessageListenerContainer?.setPrefetchCount(prefetch)

            return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer))
                    .channel(rendezvousTransformerChannel1())
                    .transform(myTransformer, 'transform', { e -> e.advice(adviceWithRecoverer) })
                    .channel(rendezvousTransformerChannel2())
                    .handle(myHandler, 'save', { e -> e.advice(adviceWithRecoverer) })
                    .get()

同步频道:

@Bean
MessageChannel rendezvousTransformerChannel1() {
    return MessageChannels.rendezvous().get()
}

@Bean
MessageChannel rendezvousHandlerChannel() {
    return MessageChannels.rendezvous().get()
}

容器:

SimpleMessageListenerContainer 
transactedChannelSpanRabbitSMLC(CachingConnectionFactory rabbitConnectionFactory, String queueName){

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer()
    container.setConnectionFactory(rabbitConnectionFactory)
    container.setQueueNames(queueName)
    container.setChannelTransacted(true)
    container
}

重试恢复建议:

Advice getRetryAdviceWithRecovery() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice()
    advice.setRetryTemplate(getRetryTemplate())
    advice.recoveryCallback = getRecoveryCallback() // sends message to rabbit exchange
    advice
}

轮询:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(100).maxMessagesPerPoll(500L).get();
    }

【问题讨论】:

    标签: spring-integration spring-amqp spring-dsl


    【解决方案1】:

    促使您使用RendezvousChannels 的用例是什么?

    这非常罕见,我想我从来没有在同一个流程中看到过 2 个。

    您必须有一个用于此频道类型的轮询器;我没有看到它们,所以这意味着你有一个默认的 poller bean。

    您需要展示您的投票器,但我怀疑它没有很好地为此调整。 RendezvousChannel send() 阻塞,直到 receive() 执行某些操作。

    无论如何,如果您在侦听器容器线程上使用任何类型的线程切换(QueueChannelRendezvousChannel),您将面临消息丢失的风险。

    您可能应该从您的流程中删除那些.channel()s,而这些DirectChannels 将改用DirectChannels。

    如果您想要并发,请使用侦听器容器上的concurrentConsumers 属性。

    container.setChannelTransacted(true)

    如果您在myHandler 中发布消息,那么交易也非常昂贵。

    【讨论】:

    • 谢谢加里。只是将代码移至原始描述。我的用例是等待自动确认,直到处理程序完全处理请求。当我使用 queuechannel 时,即使转换器/处理程序未完成处理消息,消息也会立即确认。出于这个原因,我什至依赖 InboundGateway。
    • 你不应该在这里使用QueueChannelRendezvousChannel;您希望处理程序在容器线程上运行;完全删除.channel(),这样AUTO ack 在myHandler.save() 返回之前不会发生。 .from().transform().handle()。入站网关在这里也不合适。
    • 我会尝试建议的方法。感谢 Gary 今天的快速帮助。
    • 用例+:我们想要可轮询通道的原因是我们的处理程序命中的 REST 端点很少,无法处理很大的速度。因此,我们配置了默认轮询器,该轮询器每 100 毫秒节流一次,每次仅轮询 500 条消息。当我们使用 queuechannel 时,我们将队列深度配置为 1000。兔子队列有 100 万条消息。通过从可轮询通道转移到像 directchannel(默认)这样的可订阅通道,我仍然可以实现消息限制以及事务性自动确认,直到 handler.save() 同时返回?
    • 这是错误的架构,因为一旦轮询器从第一个 RendezvousChannel 获取消息,消息就会被确认。如果要控制消息率,可以考虑使用新的Polled Inbound Channel AdapterMessageSourcePollingTemplate ,可以控制消息的消耗率。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多