【发布时间】:2022-01-26 16:07:20
【问题描述】:
给定(无限)事件流,服务器程序必须将事件流式传输到多个客户端。例如:
for _, event := range events {
for _, client := range clients {
client.write(event) // blocking operation
}
}
但是,如果客户端速度较慢,它可能会限制其他客户端。因此,对于每个客户端,我们可以添加一个通道(以及一个使用该通道的客户端特定的 goroutine):
for _, event := range events {
for _, client := range clients {
client.writer <- event // writer channel is consumed by a per-client go routine
}
}
只要缓冲通道未满,就可以。但是,如果通道已满,它将再次阻塞。我可以想到以下选项:
-
在频道已满时删除事件,关闭频道,强制客户端重新连接。这需要重新协商频道位置,或完整的重新流式传输。强制重置可能会浪费资源,使缓慢的客户端更慢
-
添加一个相反的通道,让客户端向服务器发出它已准备好接收的信号。这要求服务器跟踪每个客户端的流位置(也许这还不错?)。在标称(非慢速)情况下,它的同步似乎比必要的多一点,从而增加了延迟。
还有什么? go 的惯用方式是什么?
编辑:这是一个简单、好看但不完善的解决方案:跟踪客户端对象中的流位置,并发送未完成的事件:
for _, event := range events {
persisted_events = append(persisted_events, event)
for _, client := range clients {
for _, event := range persisted_events[client.last_event:] {
select {
case client.writer <- event:
client.last_event++
default:
break
}
}
}
}
这允许慢速客户端赶上,而不会影响其他客户端。它不需要断开连接。但是,它也被破坏了:如果在客户端赶上时事件流停止,则慢速客户端可能会卡住 - 如果等待新事件,则作为主循环。添加触发循环的代码有时效率不高。要求客户端通知主循环它现在处于空闲状态很复杂,并且可能会使事件数量翻倍。
【问题讨论】:
-
参见the gorilla chat example 以获取第一种方法的示例。如果客户端在使用缓冲通道平滑突发后仍然太慢,唯一的选择是减少发送给客户端的数据量。
-
@CeriseLimón:很有趣,我打开了那个例子 :) 但是,它不会重新传输错过的事件,让慢速客户端错过对话的一部分。
-
"如果一个客户端很慢,它会限制其他客户端。因此,对于每个客户端,我们可以添加一个频道:" 不是禁止频道,但我会将其更正为"因此,对于每个客户端,我们可以添加一个goroutine”。并发是处理并发客户端的最简单方法
-
@erenon Re-stream 取决于您的应用程序的详细信息。一般流程是这样的:客户端维护最后一个事件 id 或时间戳。客户端在连接时包含此信息。服务器查询数据库以查找丢失的事件并写入客户端。服务器为客户端订阅事件并开始流式传输新事件。如果客户端无法跟上事件流,那么您必须找到一种方法来减少发送给客户端的数据。
-
没有一个 Go 习惯用法可以比客户端接受消息的速度更快地向客户端发送消息。