【问题标题】:Ratchet PHP WAMP - React / ZeroMQ - Specific user broadcastRatchet PHP WAMP - React / ZeroMQ - 特定用户广播
【发布时间】:2013-10-04 15:19:43
【问题描述】:

注意:这与使用MessageComponentInterfacethis question 相同。我改用WampServerInterface,所以这个问题专门针对那个部分。我需要一个带有代码示例和解释的答案,因为我可以看到这对将来的其他人有帮助。

为单个用户尝试循环推送

我正在使用 Ratchet 和 ZeroMQ 的 WAMP 部分,我目前有一个工作版本的 push integration tutorial

我正在尝试执行以下操作:

  • zeromq 服务器已启动并运行,准备记录订阅者和取消订阅者
  • 用户通过 websocket 协议在浏览器中进行连接
  • 一个循环被启动,它将数据发送给请求它的特定用户
  • 当用户断开连接时,该用户数据的循环将停止

我的第 (1) 和 (2) 点有效,但我遇到的问题是第三点:

首先:我怎样才能只向每个特定用户发送数据? 广播将其发送给每个人,除非“主题”最终可能是个人用户 ID?

其次:我有一个很大的安全问题。如果我要从客户端发送想要订阅的用户 ID,这似乎是我需要的,那么用户可以更改变量到另一个用户的 ID 并返回他们的数据。

第三点:我必须运行一个单独的 php 脚本,其中包含 zeromq 的代码以开始实际循环。我不确定这是最好的方法,我宁愿让它完全在代码库中工作,而不是单独的 php 文件。这是我需要整理的一个主要领域。

以下代码显示了我目前拥有的内容。

刚刚从控制台运行的服务器

我按字面意思输入php bin/push-server.php 来运行它。订阅和取消订阅将输出到此终端以进行调试。

$loop   = React\EventLoop\Factory::create();
$pusher = Pusher;

$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onMessage'));

$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
$webServer = new Ratchet\Server\IoServer(
    new Ratchet\WebSocket\WsServer(
        new Ratchet\Wamp\WampServer(
            $pusher
        )
    ),
    $webSock
);

$loop->run();

通过 websocket 发送数据的 Pusher

我省略了无用的东西,专注于 onMessage()onSubscribe() 方法。

public function onSubscribe(ConnectionInterface $conn, $topic) 
{
    $subject = $topic->getId();
    $ip = $conn->remoteAddress;

    if (!array_key_exists($subject, $this->subscribedTopics)) 
    {
        $this->subscribedTopics[$subject] = $topic;
    }

    $this->clients[] = $conn->resourceId;

    echo sprintf("New Connection: %s" . PHP_EOL, $conn->remoteAddress);
}

public function onMessage($entry) {
    $entryData = json_decode($entry, true);

    var_dump($entryData);

    if (!array_key_exists($entryData['topic'], $this->subscribedTopics)) {
        return;
    }

    $topic = $this->subscribedTopics[$entryData['topic']];

    // This sends out everything to multiple users, not what I want!!
    // I can't send() to individual connections from here I don't think :S
    $topic->broadcast($entryData);
}

开始循环使用上述 Pusher 代码的脚本

这是我的问题 - 这是一个单独的 php 文件,希望将来可以集成到其他代码中,但目前我不确定如何正确使用它。我是否从会话中获取用户 ID?我仍然需要从客户端发送它...

// Thought sessions might work here but they don't work for subscription
session_start();
$userId = $_SESSION['userId'];

$loop   = React\EventLoop\Factory::create();

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");

$i = 0;
$loop->addPeriodicTimer(4, function() use ($socket, $loop, $userId, &$i) {

   $entryData = array(
       'topic'     => 'subscriptionTopicHere',
       'userId'    => $userId
    );
    $i++;

    // So it doesn't go on infinitely if run from browser
    if ($i >= 3)
    {
        $loop->stop();
    }

    // Send stuff to the queue
    $socket->send(json_encode($entryData));
});

最后是要订阅的客户端js

$(document).ready(function() { 

    var conn = new ab.Session(
        'ws://localhost:8080' 
      , function() {            
            conn.subscribe('topicHere', function(topic, data) {
                console.log(topic);
                console.log(data);
            });
        }
      , function() {          
            console.warn('WebSocket connection closed');
        }
      , {                       
            'skipSubprotocolCheck': true
        }
    );
});

结论

以上是可行的,但我真的需要弄清楚以下几点:

  • 如何向个人用户发送个人消息?当他们访问在 JS 中启动 websocket 连接的页面时,我是否也应该启动将内容推入 PHP 队列的脚本(zeromq)?这就是我目前正在手动执行的操作,只是感觉不对

  • 从 JS 订阅用户时,从会话中获取用户 ID 并从客户端发送它是不安全的。这可能是伪造的。请告诉我有一个更简单的方法,如果是,如何?

【问题讨论】:

    标签: php websocket zeromq ratchet reactphp


    【解决方案1】:

    要发送给特定用户,您需要 ROUTER-DEALER 模式而不是 PUB-SUB。这在指南第 3 章中进行了解释。如果您使用的是 ZMQ v4.0,安全性是在线路级别处理的,因此您在应用程序中看不到它。它仍然需要一些工作,除非您使用提供身份验证框架 (zauth) 的 CZMQ 绑定。

    基本上,要进行身份验证,您需要在 inproc://zeromq.zap.01 上安装一个处理程序,并通过该套接字响应请求。用于 RFC 的 Google ZeroMQ ZAP;在核心 libzmq/tests/test_security_curve.cpp 程序中还有一个测试用例。

    【讨论】:

      【解决方案2】:

      注意:我在这里的回答不包括对 ZeroMQ 的引用,因为我不再使用它了。但是,如果需要,我相信您将能够弄清楚如何在此答案中使用 ZeroMQ。

      使用 JSON

      首先,要订阅的主题的Websocket RFCWAMP Spec 状态必须是字符串。我在这里有点作弊,但我仍然遵守规范:我正在传递 JSON。

      {
          "topic": "subject here",
          "userId": "1",
          "token": "dsah9273bui3f92h3r83f82h3"
      }
      

      JSON 仍然是一个字符串,但它允许我传递更多数据来代替“主题”,并且 PHP 在另一端执行 json_decode() 很简单。当然,您应该验证您是否确实收到了 JSON,但这取决于您的实现。

      那么我在这里经过了什么,为什么?

      • 主题

      主题是用户订阅的主题。您可以使用它来决定将哪些数据传回给用户。

      • 用户 ID

      显然是用户的 ID。您必须使用下一部分验证此用户是否存在并被允许订阅:

      • 令牌

      这应该是一个一次性随机生成的令牌,在您的 PHP 中生成,并传递给 JavaScript 变量。当我说“一次使用”时,我的意思是每次重新加载页面时(以及,通过扩展,在每个 HTTP 请求上),你的 JavaScript 变量都应该有一个新的令牌。此令牌应根据用户 ID 存储在数据库中。

      然后,一旦发出 websocket 请求,您将令牌和用户 ID 与数据库中的内容进行匹配,以确保用户确实是他们所说的人,并且他们没有弄乱 JS 变量。

      注意:在您的事件处理程序中,您可以使用$conn->remoteAddress 来获取连接的 IP,因此如果有人试图恶意连接,您可以阻止他们(记录他们或其他什么)。

      为什么会这样?

      之所以有效,是因为每次有新连接通过时,唯一令牌可确保任何用户都无法访问其他任何人的订阅数据。

      服务器

      这是我用于运行循环和事件处理程序的内容。我正在创建循环,创建所有装饰器样式的对象,并将循环传入我的 EventHandler(我很快就会讲到)。

      $loop = Factory::create();
      
      new IoServer(
          new WsServer(
              new WampServer(
                  new EventHandler($loop) // This is my class. Pass in the loop!
              )
          ),
          $webSock
      );
      
      $loop->run();
      

      事件处理程序

      class EventHandler implements WampServerInterface, MessageComponentInterface
      {
          /**
           * @var \React\EventLoop\LoopInterface
           */
          private $loop;
      
          /**
           * @var array List of connected clients
           */
          private $clients;
      
          /**
           * Pass in the react event loop here
           */
          public function __construct(LoopInterface $loop)
          {
              $this->loop = $loop;
          }
      
          /**
           * A user connects, we store the connection by the unique resource id
           */
          public function onOpen(ConnectionInterface $conn)
          {
              $this->clients[$conn->resourceId]['conn'] = $conn;
          }
      
          /**
           * A user subscribes. The JSON is in $subscription->getId()
           */
          public function onSubscribe(ConnectionInterface $conn, $subscription)
          {
              // This is the JSON passed in from your JavaScript
              // Obviously you need to validate it's JSON and expected data etc...
              $data = json_decode(subscription->getId());
              
              // Validate the users id and token together against the db values
              
              // Now, let's subscribe this user only
              // 5 = the interval, in seconds
              $timer = $this->loop->addPeriodicTimer(5, function() use ($subscription) {
                  $data = "whatever data you want to broadcast";
                  return $subscription->broadcast(json_encode($data));
              });
      
              // Store the timer against that user's connection resource Id
              $this->clients[$conn->resourceId]['timer'] = $timer;
          }
      
          public function onClose(ConnectionInterface $conn)
          {
              // There might be a connection without a timer
              // So make sure there is one before trying to cancel it!
              if (isset($this->clients[$conn->resourceId]['timer']))
              {
                  if ($this->clients[$conn->resourceId]['timer'] instanceof TimerInterface)
                  {
                      $this->loop->cancelTimer($this->clients[$conn->resourceId]['timer']);
                  }
              }
          
              unset($this->clients[$conn->resourceId]);
          }
      
          /** Implement all the extra methods the interfaces say that you must use **/
      }
      

      基本上就是这样。这里的要点是:

      • 唯一令牌、用户 ID 和连接 ID 提供了确保一个用户无法看到另一个用户的数据所需的唯一组合。
      • 唯一令牌意味着如果同一用户打开另一个页面并请求订阅,他们将拥有自己的连接 ID + 令牌组合,因此同一用户不会在同一页面上拥有双倍的订阅(基本上,每个连接有自己的个人数据)。

      扩展

      在对数据进行任何操作之前,您应该确保所有数据都经过验证,而不是黑客攻击。使用Monolog 之类的内容记录所有连接尝试,并在出现任何严重问题时设置电子邮件转发(例如服务器停止工作,因为有人是混蛋并试图破解您的服务器)。

      结束点

      • 验证一切。我怎么强调都不过分。您在每次请求时都会更改的唯一令牌重要
      • 请记住,如果您在每个 HTTP 请求上重新生成令牌,并且在尝试通过 websockets 连接之前发出 POST 请求,则必须在尝试连接之前将重新生成的令牌传回 JavaScript (否则您的令牌将无效)。
      • 记录一切。记录连接、询问什么主题和断开连接的每个人。 Monolog 非常适合这一点。

      【讨论】:

      • 聚会迟到了,但这个答案真的帮了我大忙。谢谢。
      • 在 json 中使用 userId 是一个非常糟糕的主意,因为您可以更改值并订阅其他人的频道。您可以改为使用他的会话注册用户并将其附加到数组中,然后发送给该特定用户
      • @Sekai 但是任何改变客户端的东西都行不通,因为用户ID和每次请求都会改变的唯一令牌之间会有不匹配的服务器端。在服务器端,您正在根据用户 ID 检查唯一的“一次性”请求令牌。你错过了还是我忘记了?
      • 在服务器端创建令牌也可以,但无论如何,有更简单的方法,请阅读这里socketo.me/docs/sessions
      • 如果您想在此处添加另一个答案,展示如何使用 Sessions 部分,我很乐意为您投票,因为它可能对未来的用户有用,也许对我也有用:-)跨度>
      猜你喜欢
      • 2016-02-03
      • 1970-01-01
      • 1970-01-01
      • 2012-07-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-14
      • 2016-01-02
      相关资源
      最近更新 更多