【问题标题】:Rabbit mQ messages moving from ready queue to Unack queueRabbit mQ 消息从就绪队列移动到 Unack 队列
【发布时间】:2016-10-12 03:35:01
【问题描述】:

我有一个单独的发布者和消费者。我启动发布者并发布消息。现在我启动消费者,问题是消息从就绪队列移动到取消确认队列,将消息标记为重新传递,这是我想要避免的。所以我想要什么仅当我发送 ack 而不是在消费者重启或启动时才应将其标记为重新发送

配置:

@Bean
public org.springframework.amqp.rabbit.connection.Connection mqConnection() {
    CloudFactory cloudFactory = new CloudFactory();
    Cloud cloud = cloudFactory.getCloud();
    return cloud.getServiceConnector("mqservicename", ConnectionFactory.class,
            null).createConnection();
} 
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
        return RetryInterceptorBuilder.stateful().retryOperations(retryTemplate()).recoverer(new RejectAndDontRequeueRecoverer())

            .build();
}
@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setMessageListener(new MessageListenerAdapter());
    container.setAdviceChain(new Advice[] {
           interceptor()
    });

    return container;
}



@Bean
public RetryTemplate retryTemplate(){
    Map map=new HashMap<Class<? extends Throwable>, Boolean>();
    map.put(CustomException.class, true);
    RetryTemplate retryTemplate=new RetryTemplate();
    retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3,map));
    return retryTemplate;
}

【问题讨论】:

  • 您确定要确认消息吗?
  • 是的,我是。现在有了 Garys 的评论,我将其关闭并尝试使用上述配置。但它只提供 1 次尝试而不是 3 次

标签: rabbitmq rabbitmq-exchange spring-rabbit


【解决方案1】:

如果您使用的是 Spring AMQP,则需要在提出此类问题时显示您的配置。

如果您将确认模式设置为手动,则您需要对确认负责。阅读the documentation。对于AUTO ackmode,当监听器正常返回时,容器会确认消息。

【讨论】:

  • 我可以通过更改代码中的重新交付逻辑来解决这个问题。但是现在的问题是我可以正确测试的少量数据,但是对于大量数据,我遇到了以下异常:
  • org.springframework.amqp.rabbit.connection.CachingConnectionFactory - 通道关闭:通道错误;协议方法:#method(reply-code=406, reply-text=PRECONDITION_FAILED - 未知交付标签 123, class-id=60, method-id=80)
  • 要添加详细信息,我根据重新传递标志给消息 3 次机会。对于每次失败(一些对消息进行验证的逻辑),我使用 basicNack 并重新排队为真。在最后一次失败时,我使用 basicknack 和 requeue to false。为了成功,我使用 basicAck 和 requeue to false。
  • 投放标签是相对于渠道的。与其尝试推出自己的解决方案,不如使用带自动确认模式的有状态重试拦截器 (described here),使用 RejectAndDontRequeueRecoverer 对其进行配置,它将完全满足您的需求。
  • 我已经添加了配置。SimpleMessageListenerContainer 强制设置 connectionfactory。我尝试使用 AbstractCloudConfig 的方法但它不允许。不使用 SimpleMessageListenerContainer,它只重试一次。我发现重试是基于rabbitmq设置的一些规则,但是我想在我的消息代码中的某些验证逻辑失败时重试,即在自定义条件下重试。因此,rabbitmq如何知道它必须重试消息,因为我已将autoack设置为true channel.basicConsume()?请注意我正在为rabbitmq使用云代工服务。