这正是 RxJava 的工作方式。
看看this video tutorial,从 12:50 开始。所以给出视频中的例子:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
发生的情况是subscribeOn() 嵌套了所有调用。在这种情况下,subscribeOn(Schedulers.io()) 首先生成,并在 io 线程上订阅它上面的所有内容。但是接下来会生成subscribeOn(Schedulers.newThread()),并且它优先(因为它被最后调用)来订阅它上面的所有内容。没有构建线程链。在此示例中,您实际上是无缘无故地生成 io 线程。
为了更好地处理subscribeOn() 和observeOn() 方法,我建议你看看视频同一作者的this post。他提议的是使用Transformer 来包装对这些方法的调用:
Transformer 实际上只是Func1<Observable<T>, Observable<R>>。在
换句话说:给它一个类型的Observable,它会返回一个
Observable 另一个。这与调用一系列
内联运算符。
这样,你可以有这样的方法:
<T> Transformer<T, T> applySchedulers() {
return observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
或者,如果您想重复使用您的转换器,您可以进行以下设置:
final Transformer schedulersTransformer =
observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
@SuppressWarnings("unchecked")
<T> Transformer<T, T> applySchedulers() {
return (Transformer<T, T>) schedulersTransformer;
}
那么上面的例子应该是这样的:
Observable.just(1, 2, 3)
.compose(applySchedulers())
.subscribe(System.out::println);
希望对您有所帮助。