【问题标题】:Multiple types of messaging queues in php with php-amqplib and rabbitmq [closed]使用 php-amqplib 和 rabbitmq 在 php 中使用多种类型的消息传递队列 [关闭]
【发布时间】:2025-12-13 18:05:01
【问题描述】:

所以,我正在尝试创建一个可以做 4 件事的简单应用。

1) 获取消费者列表(如果我可以获取那些注册的用户 .. 也许我可以在他们加入时以某种方式命名他们,因此它是动态的)。

2) 向一个随机消费者发送“消息”并显示结果

3) 向特定消费者发送“消息”(从上面的列表或预定义列表中获取)并显示结果

4) 向所有消费者发送“消息”并显示来自每个消费者的结果。

该应用程序使用 php-amqplib (https://github.com/videlalvaro/php-amqplib) 在 php 中完成。 rabbitmq 已启动并正在运行,并且似乎可以正常工作(尝试了教程)。

amqp 库的文档对我来说有点奇怪,所以我非常感谢一些示例代码行和所用参数的描述。

【问题讨论】:

  • 你能澄清一下你需要什么样的帮助吗?您可能已经看过 examplesstubs,它们简要介绍了如何与 RabbitMQ 进行通信。
  • 当然,目前我不确定 amqp 文档。他们似乎不是很友好。我已经看过这些示例,但它们实际上也不是所有参数的含义。我需要一些代码来解释我的每种情况
  • 是的,文档已经过时了。 PHP 没有删除某些语言的文档,例如 php.net/manual/pl/book.amqp.php。但是从你所问的看起来你必须花一些时间来了解 RabbitMQ(或者你使用什么 AMQP 代理)。扩展只是对 PHP 的 AMQP 协议的绑定,仅此而已。
  • 是的..我正在混合和匹配,看看我是否能到达任何地方..我想也许有一天我会在尝试这样做的时候有时间告诉我:)哦,好吧。我会回来发布我的结果。
  • 我想我对正在发生的事情有了更多的了解。 fpaste.org/314059/35698191 怎么样?

标签: php rabbitmq amqp php-amqplib


【解决方案1】:

1) 可以通过使用 rabbitmq 的管理 /api/queues 来解决,方法是为每个消费者命名为自身的持久队列名称。

2,3,4,这样解决了:

<?php
//Producer Config
$host = "remote_host";
$port = 5672;
$user = "user";
$pass = "pass";
$array = array("consumer1", "consumer2");

...

<?php
//Producer

if(!isset($argv[1]) || !isset($argv[2])){
        die("Specify a target and a message\n");
}

require_once __DIR__.'/config.php';
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Message\AMQPMessage;

$corr_id = uniqid();

$connection = new AMQPSSLConnection($host, $port, $user, $pass, "/", array("verify_peer" => false));
$channel = $connection->channel();

$response = null;
$onResponse = function ($rep) {
        global $response;
        $response = $rep->body;
        echo " [>] Received: '".$response."'\n";
};

list($callback_queue, ,) = $channel->queue_declare("", false, false, true, false);
$channel->basic_consume($callback_queue, '', false, false, false, false, $onResponse);
$msg = new AMQPMessage($argv[2], array('correlation_id' => uniqid(), 'reply_to' => $callback_queue));

switch($argv[1]){
        case "random":
                $dest = $array[array_rand($array)];
                $type = "direct";
                break;
        case "all":
                $dest = "to_all";
                $type = "fanout";
                break;
        case $argv[1]:
                $dest = $argv[1];
                $type = "direct";
                break;
}

$channel->exchange_declare($dest, $type, false, false, false);
$channel->basic_publish($msg, $dest);
echo " [<] Sent '".$argv[2]."' to '".$dest."'\n";

try {
        if($dest == "to_all"){
                $replies = 0;
                while(!$response || $replies < count($consumers_array)){
                        $channel->wait(null, false, $timeout);
                        $replies++;
                }
        }else{
                while(!$response){
                        $channel->wait(null, false, $timeout);
                }
        }
}catch(PhpAmqpLib\Exception\AMQPTimeoutException $e){
        echo " [x] AMQPTimeoutException thrown\n";
}

$channel->close();
$connection->close();

...

<?php
//Consumer config
$host = "remote_host";
$port = 5672;
$user = "user";
$pass = "pass";
$consumer_name = "consumerX";

...

<?php
//Consumer
require_once __DIR__ . '/config.php';
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPSSLConnection($host, $port, $user, $pass, "/", array('verify_peer' => false));
$channel = $connection->channel();

$channel->exchange_declare('to_all', 'fanout', false, false, false);
$channel->exchange_declare($consumer_name, 'direct', false, false, false);
$channel->queue_declare($consumer_name, false, false, true, false);
$channel->queue_bind($consumer_name, 'to_all');
$channel->queue_bind($consumer_name, $consumer_name);
echo '[*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
        echo " [>] Received: '".$msg->body."'\n";
        $rand = rand(1,3);
        sleep($rand);
        $reply = uniqid()." - slept ".$rand;
        echo " [<] Replied: '".$reply."'\n";
        $raspuns = new AMQPMessage($reply);
        $msg->delivery_info['channel']->basic_publish($raspuns,'',$msg->get('reply_to'));
};

$channel->basic_consume($consumer_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
        $channel->wait();
}

$channel->close();
$connection->close();

不确定这是不是最好的方法,但它让我的“hello world”开始了。

【讨论】:

    最近更新 更多