【问题标题】:Receiving RabbitMQ messages in PHPRatchet在 PHPRatchet 中接收 RabbitMQ 消息
【发布时间】:2018-01-31 13:43:38
【问题描述】:

试图实现机制,其中 PHP 将消息推送到 RabbitMQ(我不希望 RabbitMQ 直接暴露给用户),RabbitMQ 连接到 RatchetPHP,Ratchet 通过 websocket 连接将其广播给用户.

我遇到的问题是让 Ratchet 服务器同时侦听队列消息并进一步传输它们。 Ratchet 文档假定使用 ZeroMQ,经过长时间搜索不再具有此类方法的过时文档和库(例如React\Stomp)后,我需要对这些解决方案有经验的人提供新的视角。

我拥有的是pusher.php(来自 RabbitMQ 文档的标准示例):

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

只是为了简化复制场景,我还包括 Chat 类:

use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;

class Chat implements MessageComponentInterface
{
    protected $clients;

    public function __construct()
    {
        $this->clients = new \SplObjectStorage;
    }

    public function onOpen(ConnectionInterface $connection)
    {
        // Store the new connection to send messages to later
        $this->clients->attach($connection);

        echo "New connection! ({$connection->resourceId})\n";
    }

    public function onMessage(ConnectionInterface $from, $msg)
    {
        $numRecv = count($this->clients) - 1;
        echo sprintf('Connection %d sending message "%s" to %d other connection%s'."\n"
            , $from->resourceId, $msg, $numRecv, $numRecv == 1 ? '' : 's');

        foreach($this->clients as $client)
        {
            /** @var \SplObjectStorage $client */
            if($from !== $client)
            {
                // The sender is not the receiver, send to each client connected
                $client->send($msg);
            }
        }
    }

    public function onClose(ConnectionInterface $conn)
    {
        // The connection is closed, remove it, as we can no longer send it messages
        $this->clients->detach($conn);

        echo "Connection {$conn->resourceId} has disconnected\n";
    }

    public function onError(ConnectionInterface $conn, \Exception $e)
    {
        echo "An error has occurred: {$e->getMessage()}\n";

        $conn->close();
    }
}

和 Ratchet server.php(标准 Ratchet 示例和 RabbitMQ 接收器示例):

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Src\Chat;

// RABBIT_RECEIVER
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg)
{
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks))
{
    $channel->wait();
}

$channel->close();
$connection->close();
// RABBIT_RECEIVER END

$server = new \Ratchet\App('sockets.dev');
$server->route('/', new Chat());

$server->run();

当前版本基本上是 2 个独立的消息侦听机制,它们单独工作时效果很好(所以那里没有问题),除了它们相互阻塞并且不会在它们之间传输消息。

问题是如何让server.php让RabbitMQ接收消息并插入运行的Ratchet服务器。

【问题讨论】:

    标签: php websocket rabbitmq ratchet


    【解决方案1】:

    我想通了,所以为后代添加了答案。解决方案不是完全实时的,但它非常接近,似乎具有良好的性能并且对于 Ratchet websocket 服务器是非阻塞的。解决方案采用 Ratchet 自定义 loopaddPeriodicTimer 方法。

    所以server.php 的代码应该是这样的:

    $loop = React\EventLoop\Factory::create();
    $chat = new Chat($loop);
    
    $server = new \Ratchet\App('sockets.dev', 8080, '127.0.0.1', $loop);
    $server->route('/', $chat);
    
    $server->run();
    

    还有Chat.php类(只有__constructor,因为其余的都是一样的):

    public function __construct(LoopInterface $loop)
    {
        $this->loop = $loop;
        $this->clients = new \SplObjectStorage();
    
        $this->loop->addPeriodicTimer(0, function ()
        {
            $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
            $channel = $connection->channel();
            $channel->queue_declare('hello', false, false, false, false);
            echo ' [*] Checking for for messages from RabbitMQ', "\n";
    
            $max_number_messages_to_fetch_per_batch = 10000;
            do
            {
                $message = $channel->basic_get('hello', true);
                if($message)
                {
                    foreach($this->clients as $client)
                    {
                        $client->send($message->body);
                    }
    
                    $max_number_messages_to_fetch_per_batch--;
                }
            }
            while($message && $max_number_messages_to_fetch_per_batch > 0);
    
            $channel->close();
            $connection->close();
        });
    
    }
    

    启动 Ratchet 服务器将附加周期性事件(例如每 0 秒),它将检查 RabbitMQ 是否有新消息并处理它们。

    为了更好的性能控制,允许 websocket 喘口气,一批处理的来自 RabbitMQ 的消息数量被限制为 10k。已处理的消息从队列中移除,下一批将在下一次迭代中处理。

    您还可以使用addPeriodicTimer 间隔参数微调更新频率。 0 秒是您将获得的最接近实时的时间,但您可能不需要它,可以将其更改为更高的值。

    【讨论】:

    • 嗨,我也在做同样的事情,但我无法同时连接两者。此外,在哪里编写代码以将消息推送到rabbitMq。在上述解决方案中,消息(在 Chat.php 构造方法中)是从 rabitMQ 接收的。
    • @Manu:是的,我的案例是关于从 rabbitMQ 获取消息到 Ratchet。据我记得,向 rabbitMQ 发送消息应该容易得多,因为您可以在 Chat->onMessage 方法中实现它。只需连接到rabbitMQ,推送您的消息并关闭连接(您在参考pusher.php的部分中的原始问题中有来自文档的示例)。
    • 谢谢,我们可以在constructor 方法中检查邮件没有传递给发件人吗?
    • 我做的完全一样,我会像你一样发布我的实现来帮助别人
    猜你喜欢
    • 2017-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多