【发布时间】:2016-12-22 23:32:25
【问题描述】:
我正在使用 RxJava,但可能能够从另一个实现中翻译答案。
我有一个 Observable 发射一系列项目,我想将它们分成 10 个一组。像这样:
observable
.buffer(10)
.map(items -> {
StringBuilder sb = new StringBuilder();
for (String item : items)
sb.append(item.document);
return sb.toString();
})
.subscribe(...)
效果很好。但我需要能够刷新不完整的缓冲区。现在,如果我的 observable 在仅发出九个字符串后完成,我的订阅者将永远不会被调用。
有没有办法使用内置运算符来做到这一点,还是我必须创建一个自定义观察者来在观察onComplete 时进行缓冲?
谢谢!
更新:经过进一步调查,问题出在上游,我有一些自定义的 observable 在测试中运行良好,但连接在一起时会损坏。我基本上是这样做的:
Observable.never()
.mergeWith(Observable.range(1,8))
.buffer(5)
我还没有完全理解为什么,但是never() 阻止了部分批次的处理。然而,这很好用:
Observable.empty()
.mergeWith(Observable.range(1,8))
.buffer(5)
我去学习。感谢您的帮助!
【问题讨论】:
-
缓冲区应该在完成之前发出最终的不完整列表。你如何观察这个序列,你的来源是什么?
标签: rx-java reactive-programming