【问题标题】:Consuming not acknowledge messages from RabbitMq使用来自 RabbitMq 的不确认消息
【发布时间】:2024-01-19 05:13:01
【问题描述】:

我创建了一个简单的发布者和一个使用basic.consume 订阅队列的消费者。

当作业无异常运行时,我的消费者会确认消息。每当我遇到异常时,我都不会确认消息并提前返回。只有确认的消息会从队列中消失,所以它工作正常。
现在我希望消费者再次接收失败的消息,但重新使用这些消息的唯一方法是重新启动消费者。

我需要如何处理这个用例?

设置代码

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

消费者代码

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}

生产者代码

$exchange->publish('message');

【问题讨论】:

  • 你使用哪种语言,你能提供一些代码吗?
  • @zaq178miami,查看我编辑的消息
  • @Bram_Gerritsen,查看我的答案更新

标签: php rabbitmq amqp


【解决方案1】:

如果消息未被确认并且应用程序失败,它将自动重新传递,并且信封上的redelivered 属性将设置为true(除非您使用no-ack = true 标志使用它们)。

UPD:

你必须 nack 在你的 catch 块中带有重新投递标志的消息

    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }

当心无限的 nacked 消息,而在 RabbitMQ 和 AMQP 协议中根本没有实现重新传递计数。

如果您不想弄乱此类消息并且只是想添加一些延迟,您可能想在nack 方法调用之前添加一些sleep()usleep(),但这根本不是一个好主意.

有多种技术可以处理循环重新交付问题:

1.依赖Dead Letter Exchanges

  • 优点:可靠、标准、清晰
  • 缺点:需要额外的逻辑

2。使用per message or per queue TTL

  • 优点:易于实施、标准、清晰
  • 缺点:排长队可能会丢失一些消息

示例(注意,对于队列 ttl,我们只传递数字,对于消息 ttl - 任何数字字符串):

2.1 每条消息 ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);

2.2.每个队列 ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

3.在消息正文或标头中保留重新发送次数或剩余重新发送次数(也称为跳数限制或 IP 堆栈中的 ttl)

  • 优点:在应用程序级别为您提供对消息生命周期的额外控制
  • 缺点:当您必须修改消息并再次发布时,开销很大,特定于应用程序,不清楚

代码:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);

可能还有其他一些方法可以更好地控制消息重新传递流程。

结论:没有灵丹妙药。您必须决定哪种解决方案最适合您的需求或找到其他解决方案,但不要忘记在此处分享;)

【讨论】:

  • 感谢您的回答。 redelivered 确实设置为 true,但我必须重新启动阻塞消费者才能重新使用该消息。
  • 谢谢,这正是我所需要的。你能给我一些指示/建议如何防止无限重新传递的消息吗?如果我可以将请求队列延迟给定的秒数,那就太好了,这样我的消费服务器就不会超载。
  • 感谢 chrystal clear 的更新,对于其他遇到类似问题的人来说,这将是一个非常好的资源。我已经开始实施使用 DLX 并让它现在工作。它的行为正是我想要的。
  • 死信交换 ;)
【解决方案2】:

如果你不想重启消费者,那么basic.recover AMQP 命令可能就是你想要的。根据AMQP protocol

basic.recover(bit requeue)

Redeliver unacknowledged messages.

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 

【讨论】:

最近更新 更多