【问题标题】:Akka-http: connect to websocket on localhostAkka-http:连接到本地主机上的 websocket
【发布时间】:2017-12-17 20:21:36
【问题描述】:

我正在尝试通过本地主机上的 websocket 连接到某些服务器。当我尝试在 JS 中通过

ws = new WebSocket('ws://localhost:8137');

成功了。但是,当我使用 akka-http 和 akka-streams 时,出现“连接失败”错误。

object Transmitter {
    implicit val system: ActorSystem = ActorSystem()
    implicit val materializer: ActorMaterializer = ActorMaterializer()

    import system.dispatcher

    object Rec extends Actor {
        override def receive: Receive = {
            case TextMessage.Strict(msg) =>
                Log.info("Recevied signal " + msg)
        }
    }

    //  val host = "ws://echo.websocket.org"
    val host = "ws://localhost:8137"

    val sink: Sink[Message, NotUsed] = Sink.actorRef[Message](system.actorOf(Props(Rec)), PoisonPill)


    val source: Source[Message, NotUsed] = Source(List("test1", "test2") map (TextMessage(_)))


    val flow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
        Http().webSocketClientFlow(WebSocketRequest(host))

    val (upgradeResponse, closed) =
        source
        .viaMat(flow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(sink)(Keep.both) // also keep the Future[Done]
        .run()

    val connected: Future[Done.type] = upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
            Future.successful(Done)
        } else {
            Future.failed(new Exception(s"Connection failed: ${upgrade.response.status}")
        }
    }

    def test(): Unit = {
        connected.onComplete(Log.info)
    }
}

使用 ws://echo.websocket.org 完全可以正常工作。

我认为附加我的服务器的代码是没有道理的,因为它适用于 JavaScript 客户端并且问题仅在于连接,但是如果你想看它,我可以展示它。

我做错了什么?

【问题讨论】:

    标签: scala websocket akka-stream akka-http


    【解决方案1】:

    我已经使用来自akka documentation 的 websocket 服务器测试了您的客户端实现, 我没有收到任何连接错误。您的 websocket 客户端连接成功。这就是为什么我猜测问题出在您的服务器实现上。

    object WebSocketServer extends App {
      implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()
      import Directives._
    
      val greeterWebSocketService = Flow[Message].collect {
        case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream)
      }
    
      val route =
        get {
          handleWebSocketMessages(greeterWebSocketService)
        }
    
      val bindingFuture = Http().bindAndHandle(route, "localhost", 8137)
    
      println(s"Server online at http://localhost:8137/\nPress RETURN to stop...")
      StdIn.readLine()
    
      import system.dispatcher // for the future transformations
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }
    

    顺便说一句,我注意到您的演员的接收方法并未涵盖所有可能的消息。根据that akka issue, 每条消息,即使是非常小的消息,也可能以Streamed 结尾。如果您想打印所有文本消息,则该角色的更好实现将是:

    object Rec extends Actor {
      override def receive: Receive = {
        case TextMessage.Strict(text)             ⇒ println(s"Received signal $text")
        case TextMessage.Streamed(textStream)     ⇒ textStream.runFold("")(_ + _).foreach(msg => println(s"Received streamed signal: $msg"))
      }
    }
    

    请在my github 上找到一个工作项目。

    【讨论】:

    • 非常感谢您的工作。尽管它没有完全回答我的问题,但我注意到当我启动您的服务器时,另一台仍在运行。是的,在确切的端口上。问题是一个在 127.0.0.1 上运行,另一个在 ::1
    • 在您的示例中,以这种方式实现传入的流式消息是非阻塞的吗?
    【解决方案2】:

    我找到了解决方案:我使用的服务器在 IPv6 上运行(作为 ::1),但 akka-http 将 localhost 视为 127.0.0.1 并忽略 ::1。我不得不重写服务器以强制它使用 IPv4 并且它起作用了。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-10-28
      • 1970-01-01
      • 2020-09-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-17
      相关资源
      最近更新 更多