【问题标题】:How to flush an Rx buffer onComplete?如何在完成时刷新 Rx 缓冲区?
【发布时间】:2016-12-22 23:32:25
【问题描述】:

我正在使用 RxJava,但可能能够从另一个实现中翻译答案。

我有一个 Observable 发射一系列项目,我想将它们分成 10 个一组。像这样:

observable
    .buffer(10)
    .map(items -> {
        StringBuilder sb = new StringBuilder();
        for (String item : items)
            sb.append(item.document);
        return sb.toString();
    })
    .subscribe(...)

效果很好。但我需要能够刷新不完整的缓冲区。现在,如果我的 observable 在仅发出九个字符串后完成,我的订阅者将永远不会被调用。

有没有办法使用内置运算符来做到这一点,还是我必须创建一个自定义观察者来在观察onComplete 时进行缓冲?

谢谢!

更新:经过进一步调查,问题出在上游,我有一些自定义的 observable 在测试中运行良好,但连接在一起时会损坏。我基本上是这样做的:

Observable.never()
   .mergeWith(Observable.range(1,8))
   .buffer(5)

我还没有完全理解为什么,但是never() 阻止了部分批次的处理。然而,这很好用:

Observable.empty()
   .mergeWith(Observable.range(1,8))
   .buffer(5)

我去学习。感谢您的帮助!

【问题讨论】:

  • 缓冲区应该在完成之前发出最终的不完整列表。你如何观察这个序列,你的来源是什么?

标签: rx-java reactive-programming


【解决方案1】:

我不知道您为什么不从缓冲区发出最后的项目。 缓冲区将发出一组项目,但它会发出所有这些。

检查这个例子

@Test
public void stringBuffer() {
    Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    Observable.from(numbers)
            .map(number -> "uniqueKey=" + number )
            .buffer(5)
            .map(ns -> String.join("&", ns))
            .subscribe(System.out::println);

}

即使最后一个数字 10 是组中唯一的元素,它也会发出。

您可以在此处查看更多缓冲区示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableBuffer.java

还可以查看Window 运算符,可能在您的用例中效果更好

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableWindow.java

【讨论】:

  • 你说得对……经过进一步调查,问题出在我的可观察链上。我已经为问题的根本原因添加了更详细的解释。谢谢!不过有一件事:您的示例不会产生部分批次。将数字数组更改为仅转到 8 会是一个更好的示例(因此最后一批只有 3 个项目)。
猜你喜欢
  • 2015-08-10
  • 1970-01-01
  • 2010-09-20
  • 1970-01-01
  • 2021-09-03
  • 2018-07-11
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多