【问题标题】:RxJava : Start heartbeat after the last emission til the next one arrivesRxJava:在最后一个发射后开始心跳直到下一个到达
【发布时间】:2020-05-05 16:25:52
【问题描述】:

我正在处理 Http SSE 流,我必须保持 http 连接处于活动状态。为此,我每 15 秒发送一条空消息(名为心跳,其他所有消息均名为真实事件)。

source
.map(EventSource.Event::event)
.mergeWith(interval(0, 15, TimeUnit.SECONDS).map(t -> event("").withName("heartbeat"))) 

此解决方案的缺点是,如果最后一个真实事件在

我想要的是在最后收到的消息超过 15 秒后开始发送心跳事件,并在返回真实事件时停止发送此事件。

类似于以下内容:

---x-x-x-----x----------x---------------h--------- ------h---------------h--x---------------h-x-------- --x---------------h----

  • x 是真实事件
  • h 是心跳事件
  • - 等于 1 秒

任何帮助表示赞赏:)

【问题讨论】:

    标签: rx-java rx-java2


    【解决方案1】:

    我猜switchMap 运营商就是你要找的:

    source
        .map(EventSource.Event::event)
        .startWith(event("").withName("heartbeat"))
        .switchMap (event -> interval(15, TimeUnit.SECONDS)
            .map(t -> event("").withName("heartbeat"))
            .startWith(event)
        )
    

    编辑

    正如你在评论中提到的,即使还没有任何事件,你也需要开始 hearBeat。为此,您需要将一些 Init 类添加到您的 EventSource.Event 中,然后使用 filter() 运算符忽略它。查看编辑后的代码。

    编辑 2

    移除 initialDelay 并使其以 HeartBeat 事件开始

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-19
      • 1970-01-01
      • 2013-03-06
      • 1970-01-01
      • 2016-06-21
      相关资源
      最近更新 更多