【问题标题】:Is it OK to share observables between multiple clients using SSE?可以使用 SSE 在多个客户端之间共享 observables 吗?
【发布时间】:2019-06-14 15:05:01
【问题描述】:

我有一个服务(ServiceA),其端点可以订阅,订阅后,该服务使用服务器发送的事件连续生成数据。

如果这很重要,我正在使用带有 Java 的 Project Reactor。

这可能很重要,所以我将解释这个端点的作用。每 15 秒它从另一个服务 (ServiceB) 获取数据,检查它在 15 秒前获取的数据是否有一些变化,如果有,它会用这个数据产生一个新事件,如果没有变化,它不会发送任何东西(因此到客户端的有效负载尽可能小)。

现在,此应用程序可以同时连接多个客户端,并且它们都要求相同的数据 - 它不会被用户过滤等。

这个产生输出的 observable 在多个客户端之间共享是否明智?

当然,它会为我们节省很多对 ServiceB 的不必要调用,但我想知道这种方法是否有任何禁忌症 - 这是我第一次在后端编写反应式程序(来自 RxJS)和我不知道这是否会导致任何并发问题或任何其他类型的问题。

我可以看到的另一个好处是,连接的新客户端将立即收到来自 ServiceB 的最后接收数据(每次调用通常需要大约 4 秒来检索此数据)。

我还想知道这个可观察对象是否有可能仅在有一些订阅者时才调用 ServiceB - 即直到至少有一个订阅者,调用服务,如果没有订阅者停止调用它,当一个新的订阅者订阅者再次调用它,但首先获取客户端最后获取的数据(无论它有多旧或陈旧)。

【问题讨论】:

  • 不清楚 ServiceA 是位于同一个 JVM 实例中还是只能通过网络访问。以及“服务器发送事件”的工作原理。
  • ServiceB 只能通过网络访问。服务器发送事件基本上是无限的数据流。只要与客户端的连接仍然存在,它将每 15 秒传输一次。

标签: java rxjs reactive-programming project-reactor


【解决方案1】:

您的 SSE 源可以使用以下模式完美共享:

source.publish().refCount();

请注意,您需要存储该调用的返回值并将同一实例返回给后续调用者,以便共享发生。

一旦所有订阅者都取消订阅,refCount 也将取消其对原始来源的订阅。之后,第一个加入的订阅者将触发对 source 的新订阅,您应该制作它,以便它获取最新数据并每 15 秒重新初始化一次轮询周期。

【讨论】:

    猜你喜欢
    • 2013-06-20
    • 2010-11-05
    • 1970-01-01
    • 2020-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-11
    • 1970-01-01
    相关资源
    最近更新 更多