【问题标题】:RxJava concurrency with multiple subscribers and events具有多个订阅者和事件的 RxJava 并发
【发布时间】:2019-04-19 19:07:12
【问题描述】:

我正在寻找一种将多个订阅者附加到 RxJava Observable 流的方法,每个订阅者异步处理发出的事件。

我首先尝试使用 .flatMap() ,但这似乎不适用于任何后续订阅者。所有订阅者都在同一个线程上处理事件。

.flatMap(s -> Observable.just(s).subscribeOn(Schedulers.newThread()))

最终的工作是通过每次创建一个新的 Observable 来消耗一个新线程中的每个事件:

Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
            .subscribe(j -> {
                Observable.just(j)
                        .subscribeOn(Schedulers.newThread())
                        .subscribe(i -> {
                            try {
                                Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
                        });
            });

输出:

s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-2=>2
s1=>RxNewThreadScheduler-3=>3

还有多个订阅者的最终结果:

ConnectableObservable<String> e = Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
            .publish();

    e.subscribe(j -> {
        Observable.just(j)
                .subscribeOn(Schedulers.newThread())
                .subscribe(i -> {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
                });
    });

    e.subscribe(j -> {
        Observable.just(j)
                .subscribeOn(Schedulers.newThread())
                .subscribe(i -> {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    System.out.println("s2=>" + Thread.currentThread().getName() + "=>" + i);
                });
    });

    e.connect();

输出:

s2=>RxNewThreadScheduler-4=>2
s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-3=>2
s2=>RxNewThreadScheduler-6=>3
s2=>RxNewThreadScheduler-2=>1
s1=>RxNewThreadScheduler-5=>3

但是,这似乎有点笨拙。是否有更优雅的解决方案,或者 RxJava 不是一个很好的用例?

【问题讨论】:

    标签: java multithreading concurrency rx-java


    【解决方案1】:

    使用.flatMap(s -&gt; Observable.just(s).observeOn(Schedulers.newThread())....)

    【讨论】:

    • 我最初尝试过这个(.observeOn 和 .subscribeOn),但是每当我在订阅者中引入一个花费超过几毫秒的计算时,例如Thread.sleep(100),一切最终都在同一个线程上再次处理。
    【解决方案2】:

    如果我正确理解了 rx-contract,那么您正在尝试做一些违反它的事情。

    让我们看看合同

    RxJava Observable 的契约是事件( onNext() , onCompleted() , onEr ror() ) 永远不能同时发出。换句话说,单个 Observable 流必须始终是序列化和线程安全的。每个事件都可以从一个 不同的线程,只要排放不是并发的。这意味着没有交互 离开或同时执行 onNext() 。如果 onNext() 仍在执行 一个线程,另一个线程不能再次开始调用它(交错)。 --Tomasz Nurkiewicz in Reactive Programming with RxJava

    在我看来,您试图通过在外部订阅中使用嵌套订阅来破坏合同。对订阅者的 onNext 调用不再序列化。

    为什么不将“异步”工作负载从订阅者转移到 flatMap-operator 并订阅新的 observable:

        ConnectableObservable<String> stringObservable = Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
                .flatMap(s -> {
                    return Observable.just(s).subscribeOn(Schedulers.computation());
                })
                .publish();
    
        stringObservable
                .flatMap(s -> {
                    // do More asyncStuff depending on subscription
                    return Observable.just(s).subscribeOn(Schedulers.newThread());
                })
                .subscribe(s -> {
                    // use result here
                });
    
        stringObservable
                .subscribe(s -> {
                    // use immediate result here.
                });
    
        stringObservable.connect();
    

    【讨论】:

    • 我相信合同适用于 Observable 而不是订阅者。我不是想用并发事件创建单个 Observable 流。但相反,我试图让每个事件都有多个不相关的订阅者。由于它们不相关,我想在不同的线程中处理它们以利用并行化。在我当前的用例中,订户进行了一系列不需要任何未来处理的网络调用,因此我不确定将其移至 .flatMap() 运算符会获得什么好处。
    • “我不想创建一个带有并发事件的单个 Observable 流。”实际上,您正在尝试这样做。由于发布/连接,您只有一个将从订阅中调用 onNext 的流。 onNext 调用是序列化的。让我们看一下订阅者接口:void onNext(String s)。您应该为每个订阅创建一个新的 observable,它将调用您在 flatMap 中的操作并订阅并订阅该特定的操作。
    【解决方案3】:

    flatMap 以及 Observable 内的 doOnNext 上的 flatMap 将产生与您相同的输出。

    onNext() 总是以顺序方式调用,因此在flatMap 之后使用doOnNext 也不适合您。由于同样的原因,在最终的 subscribe 中编写操作在您的情况下不起作用。

    以下代码是使用 RxJava2 编写的。在 RxJava 版本 1 中,您必须在 Thread.sleep 周围添加 try-catch 块。

    ConnectableObservable<String> e = Observable.just("1", "2", "3").publish();
    
    e.flatMap(
          s -> Observable.just(s)
              .subscribeOn(Schedulers.newThread())
              .doOnNext(i -> {  // <<<<<<
                  Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                  System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
              }))
      .subscribe();
    
    e.flatMap(
          s -> Observable.just(s)
              .subscribeOn(Schedulers.newThread())
              .doOnNext(i -> {  // <<<<<<
                  Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                  System.out.println("s2=>" + Thread.currentThread().getName() + "=>" + i);
              }))
      .subscribe();
    
    e.connect();
    

    【讨论】:

      【解决方案4】:

      您可以通过 Flowable 和并行来实现它:

              Flowable.fromIterable(Arrays.asList("1", "2", "3"))
                      .parallel(3)
                      .runOn(Schedulers.newThread())
                      .map(item -> {
                          try {
                              Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                          } catch (InterruptedException e1) {
                              e1.printStackTrace();
                          }
                          System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + item);
      
                          return Completable.complete();
                      })
              .sequential().subscribe();
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-03-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-06-27
        • 1970-01-01
        相关资源
        最近更新 更多