【发布时间】:2018-03-21 01:21:36
【问题描述】:
我写这段代码是为了分拆大量的WebClients(受reactor.ipc.netty.workerCount限制),立即启动Mono,等待所有的Monos完成:
List<Mono<List<MetricDataModel>>> monos = new ArrayList<>(metricConfigs.size());
for (MetricConfig metricConfig : metricConfigs) {
try {
monos.add(extractMetrics.queryMetricData(metricConfig)
.doOnSuccess(result -> {
metricDataList.addAll(result);
})
.cache());
} catch (Exception e) {
}
}
Mono.when(monos)
.doFinally(onFinally -> {
Map<String, Date> latestMap;
try {
latestMap = extractInsights.queryInsights();
Transform transform = new Transform(copierConfig.getEventType());
ArrayList<Event> eventList = transform.toEvents(latestMap, metricDataList);
} catch (Exception e) {
log.error("copy: mono: when: {}", e.getMessage(), e);
}
})
.block();
它“有效”,即结果符合预期。
两个问题:
- 这是正确吗?
cache()是否会导致when等待所有 Monos 完成? - 它高效吗?有没有办法让这更快?
谢谢。
【问题讨论】:
标签: project-reactor spring-webflux