【问题标题】:Spring rabbit retries to deliver rejected message..is it OK?Spring rabbit 重试传递被拒绝的消息.. 可以吗?
【发布时间】:2017-02-12 16:56:34
【问题描述】:

我有以下配置

spring.rabbitmq.listener.prefetch=1
spring.rabbitmq.listener.concurrency=1
spring.rabbitmq.listener.retry.enabled=true
spring.rabbitmq.listener.retry.max-attempts=3
spring.rabbitmq.listener.retry.max-interval=1000
spring.rabbitmq.listener.default-requeue-rejected=false //I have also changed it to true but the same behavior still happens

在我的听众中,我抛出异常 AmqpRejectAndDontRequeueException 拒绝该消息并强制兔子不要尝试重新发送它...但是兔子然后将其重新发送 3 次最后将其路由到死信队列。

这是根据我提供的配置的标准行为还是我错过了什么?

【问题讨论】:

    标签: spring-boot rabbitmq spring-amqp spring-rabbit


    【解决方案1】:

    在使用 Spring Boot 2.3.5 和 Spring AMQP Starter 2.2.12 时,此处发布的其他答案对我不起作用,但对于这些版本,我能够自定义重试策略以不重试 AmqpRejectAndDontRequeueException 异常:

    @Configuration
    public class RabbitConfiguration {
    
    @Bean
    public RabbitRetryTemplateCustomizer customizeRetryPolicy(
            @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") int maxAttempts) {
        SimpleRetryPolicy policy = new SimpleRetryPolicy(maxAttempts, Map.of(AmqpRejectAndDontRequeueException.class, false), true, true);
        return (target, retryTemplate) -> retryTemplate.setRetryPolicy(policy);
    }
    

    }

    这让重试策略跳过 AmqpRejectAndDontRequeueExceptions 的重试,但像往常一样重试所有其他异常。

    这样配置,它会遍历异常的原因,如果发现 AmqpRejectAndDontRequeueException 则跳过重试。

    需要遍历原因,因为org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter#invokeHandler 将所有异常包装为ListenerExecutionFailedException

    【讨论】:

      【解决方案2】:

      您必须将重试策略配置为不针对该异常重试。

      你不能用属性做到这一点,你必须自己配置重试建议。

      如果您需要帮助,我稍后会发布一个示例。

      requeue-rejected 处于容器级别(低于堆栈重试)。

      编辑

      @SpringBootApplication
      public class So39853762Application {
      
          public static void main(String[] args) throws Exception {
              ConfigurableApplicationContext context = SpringApplication.run(So39853762Application.class, args);
              Thread.sleep(60000);
              context.close();
          }
      
          @RabbitListener(queues = "foo")
          public void foo(String foo) {
              System.out.println(foo);
              if ("foo".equals(foo)) {
                  throw new AmqpRejectAndDontRequeueException("foo"); // won't be retried.
              }
              else {
                  throw new IllegalStateException("bar"); // will be retried
              }
          }
      
          @Bean
          public ListenerRetryAdviceCustomizer retryCustomizer(SimpleRabbitListenerContainerFactory containerFactory,
                  RabbitProperties rabbitPropeties) {
              return new ListenerRetryAdviceCustomizer(containerFactory, rabbitPropeties);
          }
      
          public static class ListenerRetryAdviceCustomizer implements InitializingBean {
      
              private final SimpleRabbitListenerContainerFactory containerFactory;
      
              private final RabbitProperties rabbitPropeties;
      
              public ListenerRetryAdviceCustomizer(SimpleRabbitListenerContainerFactory containerFactory,
                      RabbitProperties rabbitPropeties) {
                  this.containerFactory = containerFactory;
                  this.rabbitPropeties = rabbitPropeties;
              }
      
              @Override
              public void afterPropertiesSet() throws Exception {
                  ListenerRetry retryConfig = this.rabbitPropeties.getListener().getRetry();
                  if (retryConfig.isEnabled()) {
                      RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
                              ? RetryInterceptorBuilder.stateless()
                              : RetryInterceptorBuilder.stateful());
                      Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
                      retryableExceptions.put(AmqpRejectAndDontRequeueException.class, false);
                      retryableExceptions.put(IllegalStateException.class, true);
                      SimpleRetryPolicy policy =
                              new SimpleRetryPolicy(retryConfig.getMaxAttempts(), retryableExceptions, true);
                      ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
                      backOff.setInitialInterval(retryConfig.getInitialInterval());
                      backOff.setMultiplier(retryConfig.getMultiplier());
                      backOff.setMaxInterval(retryConfig.getMaxInterval());
                      builder.retryPolicy(policy)
                          .backOffPolicy(backOff)
                          .recoverer(new RejectAndDontRequeueRecoverer());
                      this.containerFactory.setAdviceChain(builder.build());
                  }
              }
      
          }
      
      }
      

      注意:您目前无法将策略配置为重试所有异常,“除了”这个 - 您必须对要重试的所有异常进行分类(并且它们不能是 @987654324 的超类@)。我开了一个issue to support this

      【讨论】:

      • 感谢@GaryRussell 的帖子 - 我无法弄清楚为什么我在消费者容器中抛出的 AmqpRejectAndDontRequeueException 正在被重试。现在我明白了 - RetryPolicy 管理这个。
      猜你喜欢
      • 1970-01-01
      • 2020-05-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-02-28
      • 1970-01-01
      • 1970-01-01
      • 2017-01-25
      相关资源
      最近更新 更多