【问题标题】:php rabbitmq consumer reconnectphp rabbitmq 消费者重新连接
【发布时间】:2023-07-14 11:26:01
【问题描述】:

我有一个使用 RabbitMQ 的 PHP 应用程序。为了冗余,我创建了一对 RabbitMQ 服务器并将它们加入到一个集群中。我还有一个运行 HAProxy 的 VyOS 故障转移集群来负载平衡连接并在发生故障转移时提供重新路由。

昨天,我们的 VyOS 集群决定需要进行故障转移(可能是短暂的网络中断)。 HAproxy 在虚拟 IP 移动的一个 VyOS 上停止,并在另一个节点上重新启动 HAproxy。

之后,我查看了 Rabbit 中的队列,发现每个队列的消费者为零。我检查了运行消费者的机器仍然在运行 PHP。我离开了他们一段时间,看看他们是否会重新连接,但他们没有。我不得不杀死 PHP 脚本并重新启动它们,它们重新连接并立即开始使用。

我认为 RabbitMQ 和 HAproxy 正在按预期工作...现在我需要 PHP 使用者来支持故障转移事件...换句话说,它不仅需要挂起,还需要检测断开连接并自动重新连接。

这是我的 RabbitMQ 课程。提前感谢您的帮助!

<?php
while(true)
{
    try{getMessages("transcode2");}
    catch(Exception $e){echo($e->getMessage()."\n");}
    sleep(1);
}
require_once("../api/db.php");
require_once("../vendor/autoload.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
function sendMessage($msg,$prio)
{
    global $channel;
    $msg=json_encode($msg);
    $queue="transcode2";
    $channel->queue_declare($queue,true,false,false,false);
    $channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue);
}
function getMessages($queue)
{
    global $connection,$channel;
    $connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD);
    $channel=$connection->channel();
    $channel->queue_declare($queue,true,false,false,false);
    $callback=function($msg)
    {
        if(handleMessage(json_decode($msg->body,true)))
        {
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        }
        else
        {
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true);
        }
    };
    $channel->basic_qos(null,1,null);
    $channel->basic_consume($queue,'',false,false,false,false,$callback);
    while(count($channel->callbacks))
    {
        try{$channel->wait();}
        catch(Exception $e)
        {
            break;
        }
    }
    $channel->close();
    $connection->close();
}
?>

【问题讨论】:

    标签: php rabbitmq haproxy


    【解决方案1】:

    如果你对 $channel->wait(); 使用 timeout 参数,它可能会起作用

    【讨论】:

    • 我改变了 $channel->wait();到 $channel->wait(null,false,60);它起作用了,但值得注意的是它没有留在 while(count($channel->callbacks)) 循环中,它退出了 getmessage 函数,关闭了连接并在一秒钟后重新启动整个连接,当 getmessages 在顶部运行时while 循环。我想它可以工作,即使有很多重新连接开销......
    【解决方案2】:

    零超时无法正常工作,因为您说代理可能会关闭连接而 PHP 使用者没有发现它。

    解决方案是不使用零接收超时。确保连接超时大于接收超时。

    这是一个基于AMQP Interop的例子:

    安装 AMQP Interop 兼容传输,例如:

    composer require enqueue/amqp-bunny
    

    该代码与您明确设置的超时执行相同的操作:

    <?php
    use Enqueue\AmqpBunny\AmqpConnectionFactory;
    use Interop\Amqp\AmqpConsumer;
    use Interop\Amqp\AmqpMessage;
    use Interop\Amqp\AmqpQueue;
    
    $context = (new AmqpConnectionFactory(sprintf(
        'amqp://%s:%s@%s:%s/%2f?connection_timeout=600', // 10 min
        RABBITMQ_USERNAME,RABBITMQ_PASSWORD, RABBITMQ_SERVER, RABBITMQ_PORT
    )))->createContext();
    
    $context->setQos(null,1,null);
    
    //sendMessage
    
    $queue = $context->createQueue("transcode2");
    $queue->addFlag(AmqpQueue::FLAG_PASSIVE);
    $context->declareQueue($queue);
    
    $message = $context->createMessage(json_encode($msg));
    $message->setPriority($prio);
    
    $producer = $context->createProducer();
    $producer->send($queue, $message);
    
    // getMessages
    
    $consumer = $context->createConsumer($queue);
    $context->subscribe($consumer, function(AmqpMessage $message, AmqpConsumer $consumer) {
        if(handleMessage(json_decode($message->getBody(), true))) {
            $consumer->acknowledge($message);
        } else {
            $consumer->reject($message);
        }
    
        return true;
    });
    
    $receiveTimeout = 5000; // 5 seconds, should be lesser than connection_timeout which is 600 seconds now.
    
    $context->consume($receiveTimeout);
    

    【讨论】: