【问题标题】:Waiting for running Reactor Mono instances to complete等待运行 Reactor Mono 实例完成
【发布时间】:2018-03-21 01:21:36
【问题描述】:

我写这段代码是为了分拆大量的WebClients(受reactor.ipc.netty.workerCount限制),立即启动Mono,等待所有的Monos完成:

   List<Mono<List<MetricDataModel>>> monos = new ArrayList<>(metricConfigs.size());
   for (MetricConfig metricConfig : metricConfigs) {
        try {
            monos.add(extractMetrics.queryMetricData(metricConfig)
                  .doOnSuccess(result -> {
                      metricDataList.addAll(result);
                  })
                  .cache());
        } catch (Exception e) {
        }
    }

    Mono.when(monos)
          .doFinally(onFinally -> {
              Map<String, Date> latestMap;
              try {
                  latestMap = extractInsights.queryInsights();
                  Transform transform = new Transform(copierConfig.getEventType());
                  ArrayList<Event> eventList = transform.toEvents(latestMap, metricDataList);
              } catch (Exception e) {
                  log.error("copy: mono: when: {}", e.getMessage(), e);
              }
          })
          .block();

它“有效”,即结果符合预期。

两个问题:

  1. 这是正确吗? cache() 是否会导致 when 等待所有 Monos 完成?
  2. 高效吗?有没有办法让这更快?

谢谢。

【问题讨论】:

    标签: project-reactor spring-webflux


    【解决方案1】:

    您应该尽可能地尝试:

    • 使用 Reactor 运算符并组成单个反应链
    • 避免将doOn* 运算符用于除副作用(如日志记录)以外的其他事情
    • 避免共享状态

    您的代码可能看起来更像

    List<MetricConfig> metricConfigs = //...
    Mono<List<MetricDataModel>> data = Flux.fromIterable(metricConfigs)
        .flatMap(config -> extractMetrics.queryMetricData(config))
        .collectList();
    

    此外,cache() 运算符不会等待流完成(这实际上是 then() 的工作)。

    【讨论】:

    • when 还等待其所有 Mono 源完成。你仍然是正确的,cache() 与此无关,并且由于cache 调用产生的Mono 仅对when 可见,因此它没有任何用途 IMO
    猜你喜欢
    • 1970-01-01
    • 2018-07-13
    • 2021-10-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-07
    相关资源
    最近更新 更多