【问题标题】:Spring boot rabbitmq message not getting requeuedSpring Boot rabbitmq 消息没有被重新排队
【发布时间】:2018-10-20 00:17:42
【问题描述】:

您好,如果抛出特定异常,我正在尝试重新排队某些消息,但对于任何验证失败,我希望它们直接进入死信队列。我启用了相关的队列和死信队列。我发现我的验证失败已到达 dlq,但其他失败一直处于 unack 状态并不断重试,超出了我设置的最大尝试次数和乘数,有什么想法吗?下面的代码我使用的是 Spring boot 2.0.4 版本

@RabbitListener(queues = "${queuename}")
    public void consume(final @Valid @Payload MyRequest myRequest) {
        if (method.fail()) {
          throw new RuntimeException("");
        }
    }

 @Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(jackson2Converter());
    factory.setValidator(amqpValidator());
    return factory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

@Bean
public Validator amqpValidator() {
    return new OptionalValidatorFactoryBean();
}

@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}


@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory listenerContainerFactory =
            new SimpleRabbitListenerContainerFactory();
    listenerContainerFactory.setConnectionFactory(connectionFactory());
    listenerContainerFactory.setErrorHandler(new ConditionalRejectingErrorHandler(
            new MyErrorPayload()));
    listenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter());
    return listenerContainerFactory;
}

 @Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitQueueHost);
    connectionFactory.setUsername(rabbitQueueUsername);
    connectionFactory.setPassword(rabbitQueuePassword);
    connectionFactory.setVirtualHost(rabbitQueueVirtualHost);
    return connectionFactory;
}



public class MyErrorPayload implements FatalExceptionStrategy {


@Override
public boolean isFatal(Throwable t) {
  if (t instanceof ListenerExecutionFailedException &&
        (t.getCause() instanceof MessageConversionException ||
         t.getCause() instanceof MethodArgumentNotValidException )
        ) {
      return true;
  }
    return false;
 }
}

application.yml(属性)

spring:
  rabbitmq:
    host: localhost
    username: uu
    password: pp
    virtual-host: /
    listener:
      simple:
        default-requeue-rejected: false
        retry:
          enabled: true
          initial-interval: 2000
          multiplier: 1.5
          max-interval: 10000
          max-attempts: 3

【问题讨论】:

  • 我认为存在两个不同的侦听器容器。你调试发现哪个容器调用了 isFatal ?
  • 开启调试日志以观察重试活动。如果您无法从中弄清楚,请将日志发布到某个地方。
  • @GaryRussell 我可以通过在侦听器方法中抛出 RuntimeException 并复制上面的 application.yml 来重现此示例 global-handler 的问题。你能检查一下吗?
  • @Barath,感谢您指向该示例;看我的回答。

标签: spring spring-boot rabbitmq amqp spring-amqp


【解决方案1】:

这是因为你没有为容器工厂使用 Boot 的自动配置。所以忽略重试配置。

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory listenerContainerFactory =
            new SimpleRabbitListenerContainerFactory();
    listenerContainerFactory.setConnectionFactory(connectionFactory());
    listenerContainerFactory.setErrorHandler(new ConditionalRejectingErrorHandler(
            new MyErrorPayload()));
    listenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter());
    return listenerContainerFactory;
}

@Barath 在他的评论中引用的示例也是如此。

将配置器注入你的工厂方法并调用它;例如,对于那个样本...

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
        SimpleRabbitListenerContainerFactoryConfigurer configurer) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setErrorHandler(errorHandler());
    return factory;
}

如果只有一个消息转换器 Bean,配置器也会添加它。

我已经更新了示例。

编辑

选择性异常的自定义重试策略;以下禁用ValidationException 的重试,但重试所有其他。 (同样,对于示例应用程序).​​..

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
        SimpleRabbitListenerContainerFactoryConfigurer configurer, RabbitProperties properties) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setErrorHandler(errorHandler());
    ListenerRetry retryConfig = properties.getListener().getSimple().getRetry();
    if (retryConfig.isEnabled()) {
        RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
                ? RetryInterceptorBuilder.stateless()
                : RetryInterceptorBuilder.stateful());
        RetryTemplate retryTemplate = new RetryTemplate();
        Map<Class<? extends Throwable>, Boolean> retryableExceptions = Collections
                .singletonMap(ValidationException.class, false);
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(retryConfig.getMaxAttempts(),
                retryableExceptions, true, true); // retry all exceptions except Validation
        retryTemplate.setRetryPolicy(retryPolicy);
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(retryConfig.getInitialInterval().toMillis());
        backOffPolicy.setMaxInterval(retryConfig.getMaxInterval().toMillis());
        backOffPolicy.setMultiplier(retryConfig.getMultiplier());
        retryTemplate.setBackOffPolicy(backOffPolicy);
        builder.retryOperations(retryTemplate);
        builder.recoverer(new RejectAndDontRequeueRecoverer());
        factory.setAdviceChain(builder.build());
    }
    return factory;
}

自从您拥有default-requeue-rejected: false 以来,没有消息被重新排队。

【讨论】:

  • 感谢分享这个我已经包含了这个,是的,重试现在正在工作,但是我的自定义错误处理程序现在似乎没有被调用,所以一切都被重试了三遍,包括验证失败,有什么想法吗?
  • 另外我有一个集成测试来测试消息何时被放回队列中进行重试,通过进行队列计数,我希望它显示为仍有 1 条消息在队列中,但它显示为零,这是因为消息处于未确认状态吗?整数 failedMsgcountBefore = (Integer) this.amqpAdmin.getQueueProperties(queue) .get("QUEUE_MESSAGE_COUNT");
  • 这很正常。您需要自定义重试策略来进行选择性异常重试。如果您需要帮助,我可以稍后提供帮助。您正在使用无状态重试,因此它在内存中重试,而不是放回队列中。但无论如何,如果只有 1 条消息或预取为 1,即使有状态恢复,它也会立即重新传递。
  • 我想我需要那个帮助,让自己感到困惑。如果我需要自定义重试策略,我的自定义错误处理程序现在是多余的吗?还有状态恢复是否意味着我会得到队列计数,当我要求它时?在无状态模式下,如果队列出现故障,那不意味着消息会丢失吗?所以statefull会更好吗?感谢到目前为止提供的帮助非常感谢
  • 不清楚你想用你的自定义错误处理程序做什么;我本来希望看到ValidationException 周围的一些逻辑,但是,因为你有default-requeue-rejected=false,你真的不需要自定义错误处理程序——只有当你想控制哪些异常被重新排队时才需要。不,无状态重试不会导致消息丢失。不,您不会得到队列计数,因为正如我所说,消息将立即重新传递(并且重试策略将使线程休眠。我将使用自定义重试策略编辑我的答案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-01-23
  • 2014-10-03
  • 2016-07-18
相关资源
最近更新 更多