【问题标题】:Spring Webflux / Reactor release calling threadSpring Webflux / Reactor 发布调用线程
【发布时间】:2019-07-31 07:05:56
【问题描述】:

所以我知道 Spring WebFlux 和 Reactor 在后台使用 netty for nio,现在我想释放调用线程以释放资源以处理更多请求。下面的简单代码是释放调用线程吗?

@GetMapping("/foo")
public Mono<Void> bar() {

  someService.veryLongSyncOperation();

  return Mono.empty();
}

我没有将服务调用包装在 Flux/Mono 中,我只是想首先验证调用线程是否在服务完成长时间工作时被释放。这足以实现调用线程释放吗?如果是这样,有没有办法测试这个?

我在想框架看到了返回类型,这足以让它知道它必须释放调用线程。

【问题讨论】:

    标签: netty reactive-programming spring-webflux project-reactor


    【解决方案1】:

    您可以使用.subscribeOn(Schedulers.elastic()),正如reactor reference guide 中提到的那样

    @GetMapping("/foo")
    public Mono<Void> bar() {
        return Mono.fromCallable(() -> someService.veryLongSyncOperation())
                .subscribeOn(Schedulers.elastic())
                .then();
    }
    

    每个订阅都将发生在来自 Schedulers.elastic() 的专用单线程工作线程上。

    更新: 现在有Schedulers.boundedElastic() 调度器。我建议默认使用它。

    【讨论】:

      【解决方案2】:

      没有。在这种情况下,您正在调用 Netty IO 线程中长时间运行的进程。 我能想到的最简单的方法是 create 一个 Mono 接收器并在新线程中运行长操作(或者可能通过线程池)。操作成功完成后调用sink.success(),如果失败则调用sink.error(x)传递抛出的异常。

      @GetMapping("/foo")
      public Mono<Void> bar() {
          return Mono.create(sink -> {
              new Thread(() -> {
                  try {
                      someService.veryLongSyncOperation();
                      sink.success();
                  } catch (Exception ex) {
                      sink.error(ex);
                  }
              }).start();
          });  
      }
      

      调用线程在设置完流程后立即返回,WebFlux 会订阅返回的 Mono 触发线程在新线程中运行。

      【讨论】:

        猜你喜欢
        • 2020-09-12
        • 2022-06-05
        • 2021-07-30
        • 2018-09-09
        • 1970-01-01
        • 1970-01-01
        • 2020-06-16
        • 2018-09-19
        • 1970-01-01
        相关资源
        最近更新 更多