【发布时间】:2018-01-31 13:43:38
【问题描述】:
我试图实现机制,其中 PHP 将消息推送到 RabbitMQ(我不希望 RabbitMQ 直接暴露给用户),RabbitMQ 连接到 RatchetPHP,Ratchet 通过 websocket 连接将其广播给用户.
我遇到的问题是让 Ratchet 服务器同时侦听队列消息并进一步传输它们。 Ratchet 文档假定使用 ZeroMQ,经过长时间搜索不再具有此类方法的过时文档和库(例如React\Stomp)后,我需要对这些解决方案有经验的人提供新的视角。
我拥有的是pusher.php(来自 RabbitMQ 文档的标准示例):
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
只是为了简化复制场景,我还包括 Chat 类:
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;
class Chat implements MessageComponentInterface
{
protected $clients;
public function __construct()
{
$this->clients = new \SplObjectStorage;
}
public function onOpen(ConnectionInterface $connection)
{
// Store the new connection to send messages to later
$this->clients->attach($connection);
echo "New connection! ({$connection->resourceId})\n";
}
public function onMessage(ConnectionInterface $from, $msg)
{
$numRecv = count($this->clients) - 1;
echo sprintf('Connection %d sending message "%s" to %d other connection%s'."\n"
, $from->resourceId, $msg, $numRecv, $numRecv == 1 ? '' : 's');
foreach($this->clients as $client)
{
/** @var \SplObjectStorage $client */
if($from !== $client)
{
// The sender is not the receiver, send to each client connected
$client->send($msg);
}
}
}
public function onClose(ConnectionInterface $conn)
{
// The connection is closed, remove it, as we can no longer send it messages
$this->clients->detach($conn);
echo "Connection {$conn->resourceId} has disconnected\n";
}
public function onError(ConnectionInterface $conn, \Exception $e)
{
echo "An error has occurred: {$e->getMessage()}\n";
$conn->close();
}
}
和 Ratchet server.php(标准 Ratchet 示例和 RabbitMQ 接收器示例):
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Src\Chat;
// RABBIT_RECEIVER
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg)
{
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks))
{
$channel->wait();
}
$channel->close();
$connection->close();
// RABBIT_RECEIVER END
$server = new \Ratchet\App('sockets.dev');
$server->route('/', new Chat());
$server->run();
当前版本基本上是 2 个独立的消息侦听机制,它们单独工作时效果很好(所以那里没有问题),除了它们相互阻塞并且不会在它们之间传输消息。
问题是如何让server.php让RabbitMQ接收消息并插入运行的Ratchet服务器。
【问题讨论】:
标签: php websocket rabbitmq ratchet