【问题标题】:Manually ack messages in RabbitMQ在 RabbitMQ 中手动确认消息
【发布时间】:2015-06-21 08:58:07
【问题描述】:

以前我正在阅读队列中存在的所有消息,但现在我必须根据用户的选择(计数)返回特定数量的消息。

我尝试相应地更改 for 循环,但由于自动确认,它会读取所有消息。所以我尝试在配置文件中将其更改为手动。

在我的程序中,如何在阅读 msg 后手动确认消息(目前我正在使用 AmqpTemplate 接收并且我没有频道参考)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());
            
            valueList.add(value);
            messageCount--;
        }
}

任何帮助都非常感谢,在此先感谢。

【问题讨论】:

  • AmqpTemplate#receive 自动确认消息,除非通道被交易。要控制确认,您可以使用AmqpTemplate#execute 并手动接收,或者最好的方法是使用SimpleMessageListenerContainer 甚至BlockingQueueConsumer
  • @NicolasLabrot 我没有在 AmqpTemplate 中找到执行方法,你指的是别的东西吗?是的,我确实在 SimpleMessageListenerContainer 中将 setAcknowledgeMode 设置为 MANUAL。
  • 对不起,我指的是RabbitTemplate#execute,它是AmqpTemplate的实现
  • @NicolasLabrot 您能否对此有所了解。什么是 ChannelCallback,看起来我需要一个我没有的频道参考。
  • 看看RabbitTemplate#receive code 但我不认为这是正确的方法。

标签: rabbitmq spring-amqp spring-rabbit


【解决方案1】:

您不能使用 receive() 方法手动确认 - 使用 SimpleMessageListenerContainer 用于带有手动确认和 ChannelAwareMessageListener 的事件驱动消费者。或者,使用模板的 execute() 方法,它可以让您访问 Channel - 但是您将使用较低级别的 RabbitMQ API,而不是 Message 抽象。

编辑:

您需要学习底层的 RabbitMQ Java API 才能使用执行,但是这样的东西会起作用...

    final int messageCount = 3;
    boolean result = template.execute(new ChannelCallback<Boolean>() {

        @Override
        public Boolean doInRabbit(final Channel channel) throws Exception {
            int n = messageCount;
            channel.basicQos(messageCount); // prefetch
            long deliveryTag = 0;
            while (n > 0) {
                GetResponse result = channel.basicGet("si.test.queue", false);
                if (result != null) {
                    System.out.println(new String(result.getBody()));
                    deliveryTag = result.getEnvelope().getDeliveryTag();
                    n--;
                }
                else {
                    Thread.sleep(1000);
                }
            }
            if (deliveryTag > 0) {
                channel.basicAck(deliveryTag, true);
            }
            return true;
        }
    });

【讨论】:

  • GaryRussell 能否请您指出任何有关如何使用 execute() 的示例代码,我对此很陌生,我没有太多关于此的论坛。提前致谢
  • 如果回答您的问题,习惯上将其标记为已接受(单击复选标记)。这将有助于其他用户搜索相同的答案。
  • @GaryRussell 如果我们使用 receive* 方法,Spring 是使用 NONE 模式还是 AUTO 模式?意思是,是让 RabbitMQ 认为消息是自动传递的,还是 Spring 在收到消息后做 ACK?
  • 不要在 cmets 中提出新问题,尤其是在六年前的答案中。 Ack 模式仅适用于侦听器容器。模板在返回之前调用basicAck。如果要拒绝并重新排队消息,则必须在事务中运行 receive() 方法并抛出异常,以便回滚 basicAck 并将消息重新排队。
最近更新 更多