【发布时间】:2019-08-19 08:54:51
【问题描述】:
我分别订阅了两个PublishSubjects 和两个Observers。在PubslishSubject 和Subscriber 之间,我使用observeOn(Schedulers.single()) 从[main] 线程切换到[RxSingleScheduler] 线程。我开始在循环内的两个PublishSubjects 上发布(PubslishSubject.onNext())。
publishSubject1.onNext("next");
publishSubject2.onNext("next");
我的预期是两个订阅者的运行顺序与发布排放的顺序相同,但我得到的结果完全不同。 Subsriber1 处理它的所有排放,然后 Subscriber2 处理它的所有排放。 我预计排放计划并按发布顺序运行。有什么办法可以实现吗?
import java.util.concurrent.TimeUnit;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
public class ObserveOnApp {
public static void main(String[] args) {
PublishSubject<String> publishSubject1 = PublishSubject.create();
PublishSubject<String> publishSubject2 = PublishSubject.create();
publishSubject1
.observeOn(Schedulers.single())
.subscribe(next -> {
System.out.println("Subscriber1");
});
publishSubject2
.observeOn(Schedulers.single())
.subscribe(next -> {
System.out.println("Subscriber2");
});
for (int i= 0; i < 10; i++) {
publishSubject1.onNext("next");
publishSubject2.onNext("next");
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
【问题讨论】:
-
请不要在 SO 和 RxJava 问题网站上cross post您的问题。谢谢。
标签: rx-java reactive-programming rx-java2