【问题标题】:How to keep my incoming websocket connection open all the time?如何保持我传入的 websocket 连接一直打开?
【发布时间】:2021-12-02 18:59:57
【问题描述】:

我使用此示例代码客户端连接到我的 websocket 服务,但目前它只是连接然后关闭。

如何保持此连接打开且永不关闭?

一旦建立连接,我希望它保持打开状态,直到我关闭应用程序。

package docs.http.scaladsl

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

Github 参考:https://github.com/akka/akka-http/blob/v10.2.6/docs/src/test/scala/docs/http/scaladsl/WebSocketClientFlow.scala

更新 我有与上面相同的代码,除了我更新了我的源代码如下:

val source1 = Source.single(TextMessage("""{"action":"auth","params":"APIKEY_123"}"""))
val source2 = Source.single(TextMessage("""{"action":"subscribe","params":"topic123"}"""))

val sources: Source[Message, NotUsed] =
  Source.combine(source1, source2, Source.maybe)(Concat(_))

所以我可以看到我的 source1 和 source2 被发送到 websocket,但是 websocket 并没有按应有的方式开始流式传输它的值,它只是挂起。

不知道我做错了什么......

【问题讨论】:

    标签: scala akka-stream akka-http


    【解决方案1】:

    The Akka docs call out your situation:

    Akka HTTP WebSocket API 不支持半关闭连接,这意味着如果任一流完成,整个连接将关闭(在交换“关闭握手”或 3 秒超时之后)。

    在您的情况下,outgoing(作为Source.single)在发出TextMessage 后立即完成。 webSocketFlow 收到完成消息,然后断开连接。

    解决方案是在outgoing 完成时延迟,甚至可能永远延迟它(或至少直到应用程序被终止)。

    在您不想通过 websocket 发送消息的情况下,两个标准源对于延迟完成可能很有用。

    • Source.maybe 具体化为Promise,您可以使用可选的终止消息来完成它。除非承诺完成,否则它不会完成。

    • Source.never 永远不会完成。您可以通过不填写 Source.maybe 来实现这一点,但这比这要少。

    那么它在代码中会是什么样子?

    val outgoing =
      Source.single(TextMessage("hello world!"))
        .concat(Source.never)
    

    对于Source.maybe,您需要.concatMat,以便Promise 可用于完成;这确实意味着您将获得类似 val (completionPromise, upgradeResponse, closed) 的东西作为整体物化值:

    val outgoing =
      Source.single(TextMessage("hello world!"))
        .concatMat(Source.maybe[TextMessage])(Keep.right)
    
    val ((completionPromise, upgradeResponse), closed) =
      outgoing
        .viaMat(websocketFlow)(Keep.both)
        .toMat(incoming)(Keep.both)
        .run()
    

    在想通过socket发送任意多条消息的情况下,Source.actorRef或者Source.queue很方便:发送消息给物化actor ref通过websocket连接发送(发送特殊消息完成源)或offer 消息到队列,然后complete 它。

    val outgoing =
      Source.actorRef[TextMessage](
        completionMatcher = {
          case Done =>
            CompletionStrategy.draining // send the messages already sent before completing
        },
        failureMatcher = PartialFunction.empty,
        bufferSize = 100,
        overflowStrategy = OverflowStrategy.dropNew
      )
    
    val ((sendToSocketRef, upgradeResponse), closed) =
      outgoing
        .viaMat(websocketFlow)(Keep.both)
        .toMat(incoming)(Keep.both)
        .run()
    
    sendToSocketRef ! TextMessage("hello world!")
    

    【讨论】:

    • 是的,作为演员的 Source 将是完美的,这样我就可以从我的代码的各个部分触发消息。
    猜你喜欢
    • 1970-01-01
    • 2013-04-17
    • 1970-01-01
    • 2019-07-19
    • 1970-01-01
    • 1970-01-01
    • 2016-01-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多