【问题标题】:RxJava Observables not ParallelRxJava Observables 不是并行的
【发布时间】:2017-10-25 17:06:03
【问题描述】:

我是 RxJava 新手。

我正在执行一个基本代码:

public class App {

    public static void main(String... args) throws Exception {
        long startTime = System.currentTimeMillis();

        abcd().map(cnt -> cnt).subscribe((s) -> System.out.println(s));

        abcd().map(cnt -> cnt).subscribe(s -> System.out.println(s));

        long endTime = System.currentTimeMillis();
        long diff = endTime - startTime;
        System.out.println(diff);

    }

    public static Observable<Integer> abcd() {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            System.out.println();
        }
        Observable<Integer> r = Observable.fromArray(10);
        return r;
    }
}

基本上创建了两个 Observables 并且都需要 1 秒的处理时间。 并且运行这段代码的总时间超过 2 秒,这意味着两个 Observable 没有并行执行。

如何更改我的代码,以便总执行时间为 1 秒,这意味着我的两个 observable 应该并行执行。请将答案发布到 RxJava。

【问题讨论】:

    标签: rx-java2


    【解决方案1】:

    您甚至在 Observables 被创建之前就在睡觉,因此它与 Observable 在发出结果之前进行处理并不完全相同。

    您可以将abcd() 方法的执行推迟到后台线程并等待两个子流终止:

    Observable.merge(
        Observable.defer(() -> abcd())
           .subscribeOn(Schedulers.io())
           .map(cnt -> cnt)
           .doOnNext(System.out::println),
        Observable.defer(() -> abcd())
           .subscribeOn(Schedulers.io())
           .map(cnt -> cnt + 1)
           .doOnNext(System.out::println)
    )
    .blockingSubscribe(ignored -> { }, Throwable::printStackTrace);
    

    【讨论】:

      猜你喜欢
      • 2014-12-02
      • 1970-01-01
      • 2017-11-21
      • 2019-08-13
      • 1970-01-01
      • 2017-09-07
      • 1970-01-01
      • 1970-01-01
      • 2017-01-12
      相关资源
      最近更新 更多