【问题标题】:Stream events to potentially slow clients将事件流式传输到可能较慢的客户端
【发布时间】:2022-01-26 16:07:20
【问题描述】:

给定(无限)事件流,服务器程序必须将事件流式传输到多个客户端。例如:

for _, event := range events {
  for _, client := range clients {
    client.write(event) // blocking operation
  }
}

但是,如果客户端速度较慢,它可能会限制其他客户端。因此,对于每个客户端,我们可以添加一个通道(以及一个使用该通道的客户端特定的 goroutine):

for _, event := range events {
  for _, client := range clients {
    client.writer <- event // writer channel is consumed by a per-client go routine
  }
}

只要缓冲通道未满,就可以。但是,如果通道已满,它将再次阻塞。我可以想到以下选项:

  • 在频道已满时删除事件,关闭频道,强制客户端重新连接。这需要重新协商频道位置,或完整的重新流式传输。强制重置可能会浪费资源,使缓慢的客户端更慢

  • 添加一个相反的通道,让客户端向服务器发出它已准备好接收的信号。这要求服务器跟踪每个客户端的流位置(也许这还不错?)。在标称(非慢速)情况下,它的同步似乎比必要的多一点,从而增加了延迟。

还有什么? go 的惯用方式是什么?

编辑:这是一个简单、好看但不完善的解决方案:跟踪客户端对象中的流位置,并发送未完成的事件:

for _, event := range events {
  persisted_events = append(persisted_events, event)
  for _, client := range clients {
    for _, event := range persisted_events[client.last_event:] {
      select {
      case client.writer <- event:
        client.last_event++
      default:
        break
      }
    }
  }
}

这允许慢速客户端赶上,而不会影响其他客户端。它不需要断开连接。但是,它也被破坏了:如果在客户端赶上时事件流停止,则慢速客户端可能会卡住 - 如果等待新事件,则作为主循环。添加触发循环的代码有时效率不高。要求客户端通知主循环它现在处于空闲状态很复杂,并且可能会使事件数量翻倍。

【问题讨论】:

  • 参见the gorilla chat example 以获取第一种方法的示例。如果客户端在使用缓冲通道平滑突发后仍然太慢,唯一的选择是减少发送给客户端的数据量。
  • @CeriseLimón:很有趣,我打开了那个例子 :) 但是,它不会重新传输错过的事件,让慢速客户端错过对话的一部分。
  • "如果一个客户端很慢,它会限制其他客户端。因此,对于每个客户端,我们可以添加一个频道:" 不是禁止频道,但我会将其更正为"因此,对于每个客户端,我们可以添加一个goroutine”。并发是处理并发客户端的最简单方法
  • @erenon Re-stream 取决于您的应用程序的详细信息。一般流程是这样的:客户端维护最后一个事件 id 或时间戳。客户端在连接时包含此信息。服务器查询数据库以查找丢失的事件并写入客户端。服务器为客户端订阅事件并开始流式传输新事件。如果客户端无法跟上事件流,那么您必须找到一种方法来减少发送给客户端的数据。
  • 没有一个 Go 习惯用法可以比客户端接受消息的速度更快地向客户端发送消息。

标签: go channel


【解决方案1】:

迄今为止我发现的最佳设置:

for {
    var event Event = nil
    select {
    case event, ok := <-events:
        if !ok {
            return
        }
        persisted_events = append(persisted_events, event)
    case event <- ticker:
    }

    for _, client := range clients {
        select {
        case client.writer <- persisted_events[client.num_events:]:
            client.num_events = len(persisted_events)
        default:
        }
    }
}

这是问题中损坏示例的轻微变化。有两个主要区别:

  • 编写器通道获取事件片段(而不是单个事件)
  • 有一个自动收录机,即使没有新事件,它也会定期唤醒循环

结合起来,即使慢速客户端在长时间的静默期之前错过最后一个事件后立即赶上,最终赶上也只会延迟一个时间段,而不是结合代码周期加上错过的事件数。因此,减少代码周期可以限制尾部延迟。在性能方面并不理想,但很简单(在并发方面)。

【讨论】:

    猜你喜欢
    • 2015-11-29
    • 1970-01-01
    • 1970-01-01
    • 2012-02-10
    • 2014-09-26
    • 1970-01-01
    • 1970-01-01
    • 2014-12-21
    • 1970-01-01
    相关资源
    最近更新 更多