【问题标题】:Repeat last element in Flux if no elements are available upstream如果上游没有可用的元素,则重复 Flux 中的最后一个元素
【发布时间】:2019-12-17 08:59:27
【问题描述】:

Flux 的订阅者发出onNext 信号但发布者没有提供新元素时,我正在寻找一种方法来重复最后一个元素。

当然,这种方法在逻辑上会引入急切的流式传输,但在我的情况下,这正是我想要的,类似于 onBackpressureDrop 和其他在上游请求无限需求的方法。

我需要完全相反 - 我的订阅者比发布者更快。

【问题讨论】:

  • 请记住,Reactive Streams 不保证 when 元素在收到Publisher 后由Publisher 发出,因此@987654326 没有办法@ 以“预览”Publisher 是否会接受其新提出的请求,而不是接收onComplete 信号。您可以在概念上在每个请求之后开始倒计时并将最后一个值重播到下游,但是如果上游值到达,您需要正确处理它,例如。在说倒计时之后(排队,如果有需求就传播它,等等......),所以这可能是一个自定义运算符

标签: reactive-programming project-reactor


【解决方案1】:

我很难想到这样一种情况,即订阅者只需将最后发出的值缓存在自身内部并在那里执行它需要做的事情(无论是循环、在预定的执行程序上触发还是其他什么),这并不是更好的选择完全)而不是故意对Flux发出的最后一个值有无限的要求。

类似于以下可能的东西可以工作,但是非常hacky(话虽如此,我想不出更好的方法):

flux.subscribe(str -> {
    Mono.just(str).repeat().takeUntilOther(flux.next())
            .subscribe(s -> {
                //Actual subscriber
            });
});

【讨论】:

  • 我理解您对原因的担忧。为了解释原因,这只是一个偏好问题——我总是会选择更实用的方法,就是这样。
  • 我认为这里删除了一条评论。那是指subscribe的使用吗?
猜你喜欢
  • 2019-10-12
  • 2018-04-09
  • 2011-08-31
  • 2021-03-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-04-08
  • 1970-01-01
相关资源
最近更新 更多