【发布时间】:2019-04-19 19:07:12
【问题描述】:
我正在寻找一种将多个订阅者附加到 RxJava Observable 流的方法,每个订阅者异步处理发出的事件。
我首先尝试使用 .flatMap() ,但这似乎不适用于任何后续订阅者。所有订阅者都在同一个线程上处理事件。
.flatMap(s -> Observable.just(s).subscribeOn(Schedulers.newThread()))
最终的工作是通过每次创建一个新的 Observable 来消耗一个新线程中的每个事件:
Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
.subscribe(j -> {
Observable.just(j)
.subscribeOn(Schedulers.newThread())
.subscribe(i -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
});
});
输出:
s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-2=>2
s1=>RxNewThreadScheduler-3=>3
还有多个订阅者的最终结果:
ConnectableObservable<String> e = Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
.publish();
e.subscribe(j -> {
Observable.just(j)
.subscribeOn(Schedulers.newThread())
.subscribe(i -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
});
});
e.subscribe(j -> {
Observable.just(j)
.subscribeOn(Schedulers.newThread())
.subscribe(i -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("s2=>" + Thread.currentThread().getName() + "=>" + i);
});
});
e.connect();
输出:
s2=>RxNewThreadScheduler-4=>2
s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-3=>2
s2=>RxNewThreadScheduler-6=>3
s2=>RxNewThreadScheduler-2=>1
s1=>RxNewThreadScheduler-5=>3
但是,这似乎有点笨拙。是否有更优雅的解决方案,或者 RxJava 不是一个很好的用例?
【问题讨论】:
标签: java multithreading concurrency rx-java