【问题标题】:How to execute map, filter, flatMap using multiple threads in RxScala/Java?如何在 RxScala/Java 中使用多个线程执行 map、filter、flatMap?
【发布时间】:2016-02-18 08:35:37
【问题描述】:

如何使用多线程在Observable 上运行filtermapflatMap

  def withDelay[T](delay: Duration)(t: => T) = {
    Thread.sleep(delay.toMillis)
    t
  }

  Observable
    .interval(500 millisecond)
    .filter(x => {
      withDelay(1 second) { x % 2 == 0 }
    })
    .map(x => {
      withDelay(1 second) { x * x }
    }).subscribe(println(_))

目标是使用多个线程同时运行过滤和转换操作。

【问题讨论】:

  • @david.mihola,是的,我检查了它们,并且能够在多个线程中执行 subscribe 块,但是对于 map、flatMap 和 filter,我无法做到这一点。我假设我可能会在过滤或转换时调用其他 API 或从数据库中获取其他数据,因此我想确保此代码将同时执行。

标签: scala concurrency functional-programming rx-java rx-scala


【解决方案1】:

你可以在每个操作上使用 Async.toAsync()。

它在 rxjava-async 包中

Documentation

【讨论】:

    【解决方案2】:

    这将在不同的线程 (rxjava3) 中处理每个集合项。

    var collect = Observable.fromIterable(Arrays.asList("A", "B", "C"))
                          .flatMap(v -> {
                             return Observable.just(v)
                                            .observeOn(Schedulers.computation())
                                            .map(v1 -> {
                                                int time = ThreadLocalRandom.current().nextInt(1000);
                                                Thread.sleep(time);
                                                return String.format("processed-%s", v1);
                                            });
                          })
                          .observeOn(Schedulers.computation())
                          .blockingStream()
                          .collect(Collectors.toList());
    

    【讨论】:

      【解决方案3】:

      您必须使用 observeOn 运算符,它将在设置运算符后定义的特定线程中执行所有下一个运算符

             /**
       * Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread.
       * Shall print
       * First step main
       * Second step RxNewThreadScheduler-2
       * Third step RxNewThreadScheduler-1
       */
      @Test
      public void testObservableObserverOn() throws InterruptedException {
          Subscription subscription = Observable.just(1)
                  .doOnNext(number -> System.out.println("First step " + Thread.currentThread()
                          .getName()))
                  .observeOn(Schedulers.newThread())
                  .doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
                          .getName()))
                  .observeOn(Schedulers.newThread())
                  .doOnNext(number -> System.out.println( "Third step " + Thread.currentThread()
                          .getName()))
                  .subscribe();
          new TestSubscriber((Observer) subscription)
                  .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
      }
      

      更多异步示例在这里https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

      【讨论】:

      • 是的,但是如果你使用 .doOnNext() 你不能做地图,平面等操作。它只接受动作。
      • 这是一个例子,你应该可以使用任何运算符
      猜你喜欢
      • 1970-01-01
      • 2019-10-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多