【问题标题】:Project Reactor - Parallel ExecutionProject Reactor - 并行执行
【发布时间】:2021-08-23 12:45:56
【问题描述】:

我有以下 Flux,

@Test
public void fluxWithRange_CustomTest() {
    Flux<Integer> intFlux = Flux.range(1, 10).flatMap(i -> {
        if (i % 2 == 0) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return Mono.just(i);
        } else {
            return Mono.just(i);
        }
    }, 2).subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();

    StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).verifyComplete();
}

我希望它可以并行运行,但是,它只在 1 个线程中执行。

【问题讨论】:

    标签: java project-reactor


    【解决方案1】:

    subscribeOn 方法仅提供一种在“某人”订阅您的 Flux 时将执行移至不同线程的方法。这意味着当您使用 StepVerifier 时,您订阅了通量,并且因为您定义了调度程序,所以执行被移动到调度程序提供的线程之一。这并不意味着 Flux 将在多个线程之间跳转。

    您所期望的行为可以通过添加第二个 subscribeOn 来存档,但是您在 flatMap 中使用的 Mono。当 flatMap 现在订阅内容时,它将使用另一个线程。

    如果您将代码更改为以下内容:

      @Test
      public void fluxWithRange_CustomTest() throws InterruptedException {
        Flux<Integer> intFlux = Flux.range(1, 10)
          .flatMap(i -> subFlux(i),2)
          .subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();
    
        StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8,9,10).verifyComplete(); //This now fails.
    
      }
    
      private Mono<Integer> subFlux(int i) {
        Mono<Integer> result = Mono.create(sink ->
        {
          if (i % 2 == 0) {
            try {
              Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
          }
          sink.success(i);
        });
        return result.subscribeOn(Schedulers.newBoundedElastic(2, 2, "other"));
      }
    

    【讨论】:

      猜你喜欢
      • 2022-01-02
      • 2018-09-26
      • 2021-12-22
      • 2018-07-24
      • 2021-07-14
      • 1970-01-01
      • 2019-05-29
      • 2019-03-17
      • 2017-05-18
      相关资源
      最近更新 更多