【发布时间】:2018-08-17 07:48:24
【问题描述】:
我有这两个要求:
Flux<ProductID> getProductIds() {
return this.webClient.get()
.uri(PRODUCT_ID_URI)
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(ProductID.class);
}
Mono<Product> getProduct(String id) {
return this.itemServiceWebClient.get()
.uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
.build(id))
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
}
有了这些,我想做以下事情:
Flux<Product> getProducts() {
return Flux.create(sink -> this.gateway.getProductIds()
.doOnComplete(() -> {
log.info("PRODUCTS COMPLETE");
sink.complete();
})
.flatMap(productId -> this.getProduct(productId.getID()))
.subscribe(product -> {
log.info("NEW PRODUCT: " + product);
sink.next(product);
}));
}
当我调用它时,我得到以下输出:
PRODUCTS COMPLETE
NEW PRODUCT: ...
NEW PRODUCT: ...
....
当然,由于异步单声道映射,流在结果实际出现之前就关闭了。如何保持这种非阻塞状态,同时确保结果在调用 on complete 之前到达?
【问题讨论】:
-
你能解释一下你的目标是什么吗?给定 ID 时,你想拥有一个
Flux<Product>吗?除了获取产品信息之外,您是否对这些 id 进行了特定的操作?我不明白你为什么选择在这种情况下使用 Flux.create。 -
我还能做什么?我调用
getProducts()的代码是执行.toStream(),因为我不能在那里执行.subscribe()。我的目标是建立一个完全反应链,从获取 id 到获取每个 id 的产品,再到流完成后在我的视图中设置产品。 -
在你看来?所以这是一个控制器方法?
-
是的 -
getProducts()是一个控制器方法。
标签: spring-webflux project-reactor