【问题标题】:Rabbit MQ + Spring Boot: delay between resend broken messagesRabbitmq + Spring Boot:重新发送损坏消息之间的延迟
【发布时间】:2021-10-16 10:18:15
【问题描述】:

我正在使用带有 RabbitMQ 的 Spring Boot 创建应用程序。 我为 Rabbit 创建了这样的配置:

@Configuration
public class RabbitConfiguration {
    public static final String RESEND_DISPOSAL_QUEUE = "RESEND_DISPOSAL";

    @Bean
    public Queue resendDisposalQueue() {
        return new Queue(RESEND_DISPOSAL_QUEUE, true);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory (ConnectionFactory connectionFactoryr) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        return new RabbitTemplate(connectionFactory);
    }
}

我还为这样的 Rabbit 消息创建了侦听器:

@RabbitListener(queues = RESEND_DISPOSAL_QUEUE)
public void getResendDisposalPayload(String messageBody){
    LOGGER.info("[getResendDisposalPayload] message = {}", messageBody);
    // And there is some business logic
}

一切都很好,但有一个问题。 当我在侦听 RESEND_DISPOSAL_QUEUE 队列的方法 getResendDisposalPayload 中遇到异常时(例如数据库的临时问题),兔子开始重新发送最后一个未处理的消息,没有任何延迟。它会产生大量日志,并且出于某种原因让我的系统感到不舒服。

正如我在这篇文章中所读到的https://www.baeldung.com/spring-amqp-exponential-backoff“虽然使用死信队列是处理失败消息的标准方法”。 为了使用这种模式,我创建了 RetryOperationsInterceptor,它定义了传递消息的计数尝试和尝试之间的延迟。 例如:

@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
            .backOffOptions(1000, 3.0, 10000)
            .maxAttempts(3)
            .recoverer(messageRecoverer)
            .build();
}

听起来很不错,但只有一个问题:我无法在选项 maxAttempts 中定义无限尝试量。 在 maxAttempts 之后,我必须将损坏的消息保存在某个地方并在将来处理它。它需要一些额外的代码。

问题是:有什么方法可以将 Rabbit 配置为无限重发送损坏的消息,但有一定的延迟,比如延迟一秒?

【问题讨论】:

  • 我认为您应该添加spring-AMQPspring-rabbit 标签。
  • 请详细解释一下它如何帮助我解决问题?谢谢
  • 因为这显然是一个框架使用问题,Spring-AMQP framework 的作者和用户通常会在这两个标签下回答问题。

标签: java spring-boot rabbitmq spring-amqp spring-rabbit


【解决方案1】:

兔子开始重新发送最后一个未处理的消息,没有任何延迟

这就是重新传递的工作原理:它一次又一次地重新推送相同的消息,直到您手动确认或完全放弃。重新传递之间没有延迟,因为在完成某些操作之前不会从队列中拉出一条新消息。

我无法在选项maxAttempts 中定义无限尝试次数

你试过Integer.MAX_VALUE吗?相当不错的尝试次数。

另一种方法是使用延迟交换:https://docs.spring.io/spring-amqp/docs/current/reference/html/#delayed-message-exchange

您可以使用RepublishMessageRecoverer 配置重试,以便在一些尝试用尽后发布到您的原始队列中:https://docs.spring.io/spring-amqp/docs/current/reference/html/#async-listeners

【讨论】: