【问题标题】:Broadcasting messages in Play Framework WebSockets在 Play Framework WebSockets 中广播消息
【发布时间】:2014-08-25 22:01:13
【问题描述】:

我正在使用Concurrent.unicast[JsValue] 在 Play Framework WebSockets 中推送消息,并且我想优化向多个用户发送相同的消息。是否可以使用多个Concurrent.Channel 广播消息?

【问题讨论】:

  • Concurrent.broadcast 做你想做的事吗?
  • Broadcast 会把这条消息发给大家,如果我错了,请纠正我。例如,我想向 25 人中的 5 人发送数据
  • 您必须在枚举器上应用过滤器。如果消息包含有关收件人的任何信息,这将起作用。

标签: scala websocket playframework-2.0 playframework-2.2 iterate


【解决方案1】:

简答

为每个用户维护单独的频道,并有与用户关联的组

长答案

package controllers

import akka.actor.Actor
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.Concurrent.Channel
import play.api.libs.iteratee.Concurrent
import play.api.Logger
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext

object AdvancedRoomMessages {
  case class Join(name: String)
  case class BroadcastGroup(msg: String, gName: String)
  case class BroadcastAll(msg: String)
  case class AddGroup(gName: String)
  case class RemoveGroup(gName: String)
  case class AddUserToGroup(userName: String, gName: String)
  case class removeUserFromGroup(userName: String, gName: String)
}


class AdvancedRoom extends Actor {
  import scala.collection.mutable._


  /**
   * common channel for communication
   */


  val (enumerator, channel) = Concurrent.broadcast[String]



  /**
   * every user has his own channel
   */
  val users = Map[String, (Enumerator[String],Channel[String])]()

  /**
   * users can be grouped
   */
  val groups = Map[String, Option[Set[String]]]()

  import AdvancedRoomMessages._

  def receive = {
    case Join(name) => {
      /**
       * join request from the user
       */
      if(users contains name) {
        /**
         * existing user
         */
        val iteratee = Iteratee.ignore[String]
        sender ! ((iteratee, users(name)._1))
      }else {
        /**
         * join request from a new user
         */

        /**
         * create new broadcast channel
         */
        val (enumerator, channel) = Concurrent.broadcast[String]
        users += ((name, (enumerator, channel)))
        val iteratee = Iteratee.foreach[String](msg => {
          //do something with the message
        }).map{ _ => {
          /**
           * user closed his websocket client, so remove the user
           * warning ... also remove the corresponding user name in groups
           */
          users(name)._2.eofAndEnd()
          users -= name
        }}
        sender ! (iteratee, enumerator)
      }
    }
    case BroadcastGroup(msg, gName) => {
      groups(gName) match {
        case Some(gMates) => {
          gMates.foreach { person => users(person)._2.push(msg)}
        }
        case None => Logger.info("empty group") //ignore sending message
      }
    }
    case BroadcastAll(msg) => {
      channel push msg
    }
    case AddGroup(gName: String) => {
      groups += ((gName, None))
    }
    case RemoveGroup(gName: String) => {
      groups -= gName
    }
    case AddUserToGroup(userName, gName) => {
      groups(gName) match {
        case Some(gMates) => gMates += userName
        case None => Set(userName)
      }
    }
  }
}

【讨论】:

    【解决方案2】:
    def filter(group_id: String) = Enumeratee.filter[JsValue]{  json: JsValue =>
    
      group_id == (json\"group_id").as[String]
    }
    

    此过滤器必须应用为,

    def chat(group_id: String) = WebSocket.using[JsValue] { request =>
    
        val in = Iteratee.foreach[JsValue]{ msg=>
    
        public_channel.push(msg)
    
        }
    
        (in, public_enumerator &> filter(group_id))
    }
    

    【讨论】:

    • 但是如果我有 10.000 个用户并且我正在向其中的 20 个发送消息,那么我必须检查所有这 10.000 个消息,这对我来说不是很优化,也许有一些方法组合频道或类似的东西
    【解决方案3】:

    根据我的实验,Concurrent.broadcast 不会发送给所有人(可能是一些不幸的命名?) 这是我使用的按预期工作的方法。

    package controllers
    
    import play.api._
    import play.api.mvc._
    import play.api.libs.iteratee.Concurrent
    import play.api.libs.iteratee.Iteratee
    import play.api.libs.concurrent.Execution.Implicits.defaultContext
    import scala.collection.mutable.{Set => MS}
    import scala.concurrent._ 
    
    object Application extends Controller {
      val c:MS[(Int, Concurrent.Channel[String])] = MS() // (channelID, Channel))
    
      def pushHello = c.foreach(_._2.push("hello")) // push to ALL channels
    
      def index = WebSocket.async[String] { _ => future{
            val (out,channel) = Concurrent.broadcast[String]          
            val channelID = scala.util.Random.nextInt
            c.add((channelID, channel))
            val in = Iteratee.foreach[String] {
              _ match {
                case any => channel.push("received:"+any) // push to current channel
              }
            }.map { _ => c.retain(x => x._1 != channelID) }
            (in, out)
        }
      }
    }
    

    【讨论】:

      【解决方案4】:

      简答


      val (enumerator, channel) = Concurrent.broadcast[String]
      
      use above thing globally
      

      长答案


      package controllers
      
      import play.api._
      import play.api.mvc._
      import play.libs.Akka
      import play.api.libs.concurrent.Execution.Implicits.defaultContext
      import play.api.libs.iteratee.Iteratee
      import play.api.libs.iteratee.Enumerator
      import akka.actor.Props
      import akka.pattern.ask
      import akka.util.Timeout
      
      import Room._
      
      import scala.concurrent.duration._
      
      object Application extends Controller {
      
        def index = Action {
          Ok(views.html.index("Your new application is ready."))
        }
      
        /**
         * get actor ref
         */
        val room = Akka.system.actorOf(Props[Room])
      
        /**
         * websocket action
         */
        def chat(name: String) = WebSocket.async[String](implicit request => {
          implicit val timeout = Timeout(1 seconds)
      
          (room ? Join(name)).mapTo[(Iteratee[String, _], Enumerator[String])]
        })
      }
      
      
      
      //Here is the actor
      package controllers
      
      import akka.actor.Actor
      import play.api.libs.iteratee.Concurrent
      import play.api.libs.iteratee.Iteratee
      import play.api.libs.concurrent.Execution.Implicits.defaultContext
      
      object Room {
        case class Join(name: String)
        case class Broadcast(msg: String)
        case object Quit
      }
      
      class Room extends Actor {
      
        /**
         * here is the meat 
         * Creating channel globally is important here
         * This can be accessed across all cases in receive method
         * pushing the message into this channel and returning this enumerator to all ,
         * broadcasts the message 
         */
        val (enumerator, channel) = Concurrent.broadcast[String]
      
        /**
         * keep track of users
         */
        val users = scala.collection.mutable.Set[String]()
      
        import Room._
      
        def receive = {
          case Join(name) => {
            /**
             * add new users
             */
            if(!users.contains(name)) {
              users += name
      
              val iteratee = Iteratee.foreach[String]{
                msg => {
                  /**
                   * process messages from users
                   * here we are broadcast it to all other users
                   */
                  self ! Broadcast(msg)
                }
              }.map( _ => {
                /**
                 * user closed his websocket. 
                 * remove him from users
                 */
                users -= name
              })
      
              /**
               * send iteratee, enumerator pair to the sender of join message
               */
              sender ! (iteratee, enumerator)
            } else {
      
              /**
               * already added users
               */
              val iteratee = Iteratee.ignore[String]
      
              /**
               * send iteratee and enumerator pair
               */
              sender ! (iteratee, enumerator)
            }
          }
      
          case Broadcast(msg) => channel push(msg)
      
          /**
           * close the common channel only when actor is stopped
           */
          case Quit => channel eofAndEnd(); context.stop(self)
        }
      }
      

      【讨论】:

      • 这将向所有用户发送消息,如何仅向部分用户发送数据,如果我有一个 10.000 并且我只想发送给 100,分别向 100 发送相同的消息怎么办这不是最有效的方法,这就是为什么我正在寻找一些广播解决方案。
      猜你喜欢
      • 1970-01-01
      • 2018-04-02
      • 2013-09-09
      • 1970-01-01
      • 1970-01-01
      • 2013-03-11
      • 1970-01-01
      • 2017-07-04
      • 2011-09-07
      相关资源
      最近更新 更多