【问题标题】:How delete messages from an AMQP (RabbitMQ) queue?如何从 AMQP (RabbitMQ) 队列中删除消息?
【发布时间】:2013-07-25 12:03:07
【问题描述】:

就是这样。

我正在使用 PHP AMQP 从 Rabbitmq 读取结果队列,以便处理发送的每封电子邮件的重要信息。完成此操作后,我需要删除该消息或将该消息标记为已写入,以便下次读取队列时,我不会收到已处理的消息。

由于 Rabbitmq 服务器每小时发送超过 10.000 封电子邮件,每次我读取队列以处理结果发送时,脚本可以运行至少 5 分钟以处理队列中的所有消息,所以在它之后完成后,在这 5 分钟内发送了数百条新消息。这使我无法在脚本完成后清除队列,因为它会删除脚本运行期间未处理的消息位置。

这让我只有一个选择。在我的 AMQP 脚本处理或读取消息后标记或删除消息。

有没有办法做到这一点? (这是脚本)

<?php
/**
 *  result.php
 *  Script that connects to RabbitMQ, and takes the result message from
 *  the result message queue.
 */

// include the settings
 require_once('settings.php');

// try to set up a connection to the RabbitMQ server
try
{
    // construct the connection to the RabbitMQ server
    $connection = new AMQPConnection(array(
        'host'      =>  $hostname,
        'login'     =>  $username,
        'password'  =>  $password,
        'vhost'     =>  $vhost
    ));

    // connect to the RabbitMQ server
    $connection->connect();
}
catch (AMQPException $exception)
{
    echo "Could not establish a connection to the RabbitMQ server.\n";
}

// try to create the channel
try
{
    // open the channel
    $channel = new AMQPChannel($connection);
}
catch (AMQPConnectionException $exception)
{
    echo "Connection to the broker was lost (creating channel).\n";
}

// try to create the queue
try
{
    // create the queue and bind the exchange
    $queue   = new AMQPQueue($channel);
    $queue->setName($resultbox);
    $queue->setFlags(AMQP_DURABLE);
    $queue->bind('exchange1', 'key1');
    $queue->declare();
}
catch (AMQPQueueException $exception)
{
    echo "Channel is not connected to a broker (creating queue).\n";
}
catch (AMQPConnectionException $exception)
{
    echo "Connection to the broker was lost. (creating queue)/\n";
}

// Get the message from the queue. 
while ($envelope = $queue->get()) {
    //Function that processes the message
    process_message($envelope->getBody());
}
    $queue->purge();

// done, close the connection to RabbitMQ
$connection->disconnect();
?>

【问题讨论】:

    标签: php queue rabbitmq amqp mail-server


    【解决方案1】:

    在成功处理后确认消息$queue-&gt;ack(),甚至使用AMQP_AUTOACK 标志使用/获取它们。

    UPD:

    根据您的代码:

    1.确认消息
    while ($envelope = $queue->get()) {
        //Function that processes the message
        process_message($envelope->getBody());
        $queue->ack($envelope->getDeliveryTag());
    }
    
    2. 使用AMQP_AUTOACK 标志获取它:
    while ($envelope = $queue->get(AMQP_AUTOACK)) {
        //Function that processes the message
        process_message($envelope->getBody());
    }
    

    附注:

    查看AMQPQueue::consume 文档,看起来这里更适合。

    3. 消息处理完毕后,可以进行消费和确认:
    $queue->consume(function ($envelope, $queue) {
            process_message($envelope->getBody());
            $queue->ack($envelope->getDeliveryTag());
    });
    
    4. 或使用AMQP_AUTOACK 标志消费,但当处理失败时,您将无法再次处理消息:
    $queue->consume(function ($envelope, $queue) {
            process_message($envelope->getBody());
            $queue->ack($envelope->getDeliveryTag());
    }, AMQP_AUTOACK);
    

    结论:我建议使用#3 解决方案,但这取决于您。

    【讨论】: