【发布时间】:2021-10-16 10:18:15
【问题描述】:
我正在使用带有 RabbitMQ 的 Spring Boot 创建应用程序。 我为 Rabbit 创建了这样的配置:
@Configuration
public class RabbitConfiguration {
public static final String RESEND_DISPOSAL_QUEUE = "RESEND_DISPOSAL";
@Bean
public Queue resendDisposalQueue() {
return new Queue(RESEND_DISPOSAL_QUEUE, true);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory (ConnectionFactory connectionFactoryr) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
}
我还为这样的 Rabbit 消息创建了侦听器:
@RabbitListener(queues = RESEND_DISPOSAL_QUEUE)
public void getResendDisposalPayload(String messageBody){
LOGGER.info("[getResendDisposalPayload] message = {}", messageBody);
// And there is some business logic
}
一切都很好,但有一个问题。 当我在侦听 RESEND_DISPOSAL_QUEUE 队列的方法 getResendDisposalPayload 中遇到异常时(例如数据库的临时问题),兔子开始重新发送最后一个未处理的消息,没有任何延迟。它会产生大量日志,并且出于某种原因让我的系统感到不舒服。
正如我在这篇文章中所读到的https://www.baeldung.com/spring-amqp-exponential-backoff“虽然使用死信队列是处理失败消息的标准方法”。 为了使用这种模式,我创建了 RetryOperationsInterceptor,它定义了传递消息的计数尝试和尝试之间的延迟。 例如:
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.backOffOptions(1000, 3.0, 10000)
.maxAttempts(3)
.recoverer(messageRecoverer)
.build();
}
听起来很不错,但只有一个问题:我无法在选项 maxAttempts 中定义无限尝试量。 在 maxAttempts 之后,我必须将损坏的消息保存在某个地方并在将来处理它。它需要一些额外的代码。
问题是:有什么方法可以将 Rabbit 配置为无限重发送损坏的消息,但有一定的延迟,比如延迟一秒?
【问题讨论】:
-
我认为您应该添加
spring-AMQP或spring-rabbit标签。 -
请详细解释一下它如何帮助我解决问题?谢谢
-
因为这显然是一个框架使用问题,
Spring-AMQP framework的作者和用户通常会在这两个标签下回答问题。
标签: java spring-boot rabbitmq spring-amqp spring-rabbit