【问题标题】:Using side effects in Akka Streams to implement commands received from a websocket在 Akka Streams 中使用副作用来实现从 websocket 接收的命令
【发布时间】:2017-03-10 23:24:44
【问题描述】:

我希望能够单击网站上的按钮,让它代表一个命令,通过 websocket 向我的程序发出该命令,让我的程序处理该命令(这将产生副作用),然后返回将该命令的结果发送到要呈现的网站。

websocket 将负责更新用户视图中不同参与者应用的状态更改。

示例:通过网站更改 AI 指令。这会修改一些值,这些值将被报告回网站。其他用户可能会更改其他 AI 指令,或者 AI 会根据当前条件更改位置做出反应,需要客户端更新屏幕。

我在想我可以让一个参与者负责使用更改的信息更新客户端,并让接收流使用更改更新状态?

这是正确的库吗?有没有更好的方法来实现我想要的?

【问题讨论】:

    标签: websocket akka-stream akka-http


    【解决方案1】:

    您可以为此使用 akka-streams 和 akka-http。使用 Actor 作为处理程序的示例:

    package test
    
    import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash, Status}
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model.ws.{Message, TextMessage}
    import akka.http.scaladsl.server.Directives._
    import akka.stream.scaladsl.{Flow, Sink, Source, SourceQueueWithComplete}
    import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
    import akka.pattern.pipe
    
    import scala.concurrent.{ExecutionContext, Future}
    import scala.io.StdIn
    
    object Test extends App {
      implicit val actorSystem = ActorSystem()
      implicit val materializer = ActorMaterializer()
      implicit def executionContext: ExecutionContext = actorSystem.dispatcher
    
      val routes =
        path("talk") {
          get {
            val handler = actorSystem.actorOf(Props[Handler])
            val flow = Flow.fromSinkAndSource(
              Flow[Message]
                .filter(_.isText)
                .mapAsync(4) {
                  case TextMessage.Strict(text) => Future.successful(text)
                  case TextMessage.Streamed(textStream) => textStream.runReduce(_ + _)
                }
                .to(Sink.actorRefWithAck[String](handler, Handler.Started, Handler.Ack, Handler.Completed)),
              Source.queue[String](16, OverflowStrategy.backpressure)
                .map(TextMessage.Strict)
                .mapMaterializedValue { queue =>
                  handler ! Handler.OutputQueue(queue)
                  queue
                }
            )
            handleWebSocketMessages(flow)
          }
        }
    
      val bindingFuture = Http().bindAndHandle(routes, "localhost", 8080)
    
      println("Started the server, press enter to shutdown")
      StdIn.readLine()
    
      bindingFuture
        .flatMap(_.unbind())
        .onComplete(_ => actorSystem.terminate())
    }
    
    object Handler {
      case object Started
      case object Completed
      case object Ack
      case class OutputQueue(queue: SourceQueueWithComplete[String])
    }
    
    class Handler extends Actor with Stash {
      import context.dispatcher
    
      override def receive: Receive = initialReceive
    
      def initialReceive: Receive = {
        case Handler.Started =>
          println("Client has connected, waiting for queue")
          context.become(waitQueue)
          sender() ! Handler.Ack
    
        case Handler.OutputQueue(queue) =>
          println("Queue received, waiting for client")
          context.become(waitClient(queue))
      }
    
      def waitQueue: Receive = {
        case Handler.OutputQueue(queue) =>
          println("Queue received, starting")
          context.become(running(queue))
          unstashAll()
    
        case _ =>
          stash()
      }
    
      def waitClient(queue: SourceQueueWithComplete[String]): Receive = {
        case Handler.Started =>
          println("Client has connected, starting")
          context.become(running(queue))
          sender() ! Handler.Ack
          unstashAll()
    
        case _ =>
          stash()
      }
    
      case class ResultWithSender(originalSender: ActorRef, result: QueueOfferResult)
    
      def running(queue: SourceQueueWithComplete[String]): Receive = {
        case s: String =>
          // do whatever you want here with the received message
          println(s"Received text: $s")
    
          val originalSender = sender()
          queue
            .offer("some response to the client")
            .map(ResultWithSender(originalSender, _))
            .pipeTo(self)
    
        case ResultWithSender(originalSender, result) =>
          result match {
            case QueueOfferResult.Enqueued =>   // okay
              originalSender ! Handler.Ack
            case QueueOfferResult.Dropped =>  // due to the OverflowStrategy.backpressure this should not happen
              println("Could not send the response to the client")
              originalSender ! Handler.Ack
            case QueueOfferResult.Failure(e) =>
              println(s"Could not send the response to the client: $e")
              context.stop(self)
            case QueueOfferResult.QueueClosed =>
              println("Outgoing connection to the client has closed")
              context.stop(self)
          }
    
        case Handler.Completed =>
          println("Client has disconnected")
          queue.complete()
          context.stop(self)
    
        case Status.Failure(e) =>
          println(s"Client connection has failed: $e")
          e.printStackTrace()
          queue.fail(new RuntimeException("Upstream has failed", e))
          context.stop(self)
      }
    }
    

    这里有很多地方可以调整,但基本思想保持不变。或者,您可以使用GraphStage 实现handleWebSocketMessages() 方法所需的Flow[Message, Message, _]。上面使用的所有内容也在 akka-streams 文档中进行了详细描述。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-01-09
      • 2016-05-16
      • 1970-01-01
      • 1970-01-01
      • 2022-01-10
      • 2014-09-21
      • 2016-07-14
      • 1970-01-01
      相关资源
      最近更新 更多