【发布时间】:2017-02-17 18:19:41
【问题描述】:
我在 RxJava2 上相对较新,我遇到了一些奇怪的行为,所以我很可能以错误的方式使用该工具。
这是一个相当大的项目,但我已将下面的 sn-p 分离为最小可重现代码:
Observable
.interval(333, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Integer>>() {
private Subject<Integer> s = PublishSubject.create();
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
s.onNext(val);
return s;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
此代码使用.interval 模拟来自我的rx 流的事件,flatMap 接收这些事件“进行一些处理”并使用Subject 将结果推送到流中。
流是一个持续的过程,将有几个事件。
这个最小代码很愚蠢,因为我只推送 apply 回调,但在实际情况下,有几个可能的时刻可能会发生推送,并且在 apply 期间接收到的事件数量不是将通过主题发送的相同金额。
我希望通过这段代码看到的是:
value: 2 // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc
我实际得到的是:
value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
... etc
我也试过有一个Observable<Integer> o = s.share(); 并返回它,或者直接返回s.share(); 得到相同的结果。
我有点理解为什么会这样。 ObservableSource 再次被订阅 n 再次 n ,因此每个循环都有更多事件。
问题:
如何实现我的预期行为?
(如果我的预期行为不清楚,请在 cmets 上询问更多信息)
【问题讨论】:
-
如果我在你身边,我会尝试将
private Subject<Integer> s = PublishSubject.create();移动到不同的范围 -
@Blackbelt 正如我所说。这是最小的可重现代码。在完整的代码中,
Function有自己的类。 -
我可以判断我看到的内容,我猜不出你写的内容。
标签: java android rx-java rx-java2