WebSocket 确实由两个独立的流组成,只是这些流(可能)不在同一个 JVM 上。
您有两个对等点进行通信,一个是服务器,另一个是客户端,但是从已建立的 WebSocket 连接的角度来看,差异不再重要。一个数据流是对等体 1 向对等体 2 发送消息,另一个数据流是对等体 2 向对等体 1 发送消息,然后这两个对等体之间存在网络边界。如果您一次查看一个对等点,则对等点 1 从对等点 2 接收消息,而在第二个流中,对等点 1 正在向对等点 2 发送消息。
每个对等点都有一个接收部分的 Sink 和一个发送部分的 Source。实际上,您确实总共有两个 Sources 和两个 Sinks,只是不在同一个 ActorSystem 上(为了便于解释,假设两个对等点都是在 Akka HTTP 中实现的)。 peer 1 的 Source 连接到 peer 2 的 Sink,peer 2 的 Source 连接到 peer 1 的 Sink。
因此,您编写了一个描述如何处理通过第一个流传入消息的 Sink 和一个描述如何通过第二个流发送消息的 Source。通常,您希望根据接收到的消息生成消息,因此您可以将这两者连接在一起,并通过描述如何响应和传入消息并生成任意数量的传出消息的不同流来路由消息。 Flow[Message, Message, _] 并不意味着您将传出消息转换为传入消息,而是将传入消息转换为传出消息。
webSocketFlow 是一个典型的异步边界,一个代表另一个对等点的流。它将传出消息“转换”为传入消息,方法是将它们发送到另一个对等点并发射其他对等点发送的任何内容。
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
这个流是你对等的两个流的一半:
-
[message from other peer] 连接到 printSink
-
helloSource 连接到 [message to the other peer]
传入消息和传出消息之间没有关系,您只需打印收到的所有内容并发送一个“hello world!”。实际上,由于源在一条消息后完成,WebSocket 连接也会关闭,但如果您将源替换为例如Source.repeat,您将不断发送(泛滥,真的)“你好,世界!”无论传入消息的速率如何,都通过网络传输。
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
在这里,您获取来自outgoing 的所有信息,这是您要发送的消息,将其路由到webSocketFlow,后者通过与其他对等方通信来“转换”消息并将每个收到的消息生成为@987654332 @。通常,您有一个有线协议,您可以在其中将案例类/pojo/dto 消息编码和解码为有线格式。
val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???
val upgradeResponse = outgoing
.via(encode)
.viaMat(webSocketFlow)(Keep.right)
.via(decode)
.to(incoming)
.run()
或者您可以想象某种聊天服务器(啊,websockets 和聊天),它向多个客户端广播和合并消息。这应该从任何客户端获取任何消息并将其发送到每个客户端(仅用于演示,未经测试,可能不是您想要的实际聊天服务器):
val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???
// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
(chatClientReceivers, chatClientSenders).zipped.map(
(outgoingSendToClient, incomingFromClient) =>
Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))
val toClients: Graph[SinkShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))
(broadcast.outArray, chatClientReceivers).zipped
.foreach((bcOut, client) => bcOut ~> b.add(client).in)
SinkShape(broadcast.in)
}
val fromClients: Graph[SourceShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Message](chatClientSenders.size))
(merge.inSeq, chatClientSenders).zipped
.foreach((mIn, client) => b.add(client).out ~> mIn)
SourceShape(merge.out)
}
val upgradeResponse: Future[WebSocketUpgradeResponse] =
Source.fromGraph(fromClients)
.viaMat(webSocketFlow)(Keep.right)
.to(toClients)
.run()
希望这会有所帮助。