【问题标题】:How to map a flux with mono?如何用单声道映射通量?
【发布时间】:2018-08-17 07:48:24
【问题描述】:

我有这两个要求:

Flux<ProductID> getProductIds() {
    return this.webClient.get()
            .uri(PRODUCT_ID_URI)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .retrieve()
            .bodyToFlux(ProductID.class);
}

Mono<Product> getProduct(String id) {
    return this.itemServiceWebClient.get()
            .uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
                    .build(id))
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .exchange()
            .flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
}

有了这些,我想做以下事情:

Flux<Product> getProducts() {
    return Flux.create(sink -> this.gateway.getProductIds()
            .doOnComplete(() -> {
                log.info("PRODUCTS COMPLETE");

                sink.complete();
            })
            .flatMap(productId -> this.getProduct(productId.getID()))
            .subscribe(product -> {
                log.info("NEW PRODUCT: " + product);

                sink.next(product);
            }));
}

当我调用它时,我得到以下输出:

PRODUCTS COMPLETE
NEW PRODUCT: ...
NEW PRODUCT: ...
....

当然,由于异步单声道映射,流在结果实际出现之前就关闭了。如何保持这种非阻塞状态,同时确保结果在调用 on complete 之前到达?

【问题讨论】:

  • 你能解释一下你的目标是什么吗?给定 ID 时,你想拥有一个 Flux&lt;Product&gt; 吗?除了获取产品信息之外,您是否对这些 id 进行了特定的操作?我不明白你为什么选择在这种情况下使用 Flux.create。
  • 我还能做什么?我调用getProducts() 的代码是执行.toStream(),因为我不能在那里执行.subscribe()。我的目标是建立一个完全反应链,从获取 id 到获取每个 id 的产品,再到流完成后在我的视图中设置产品。
  • 在你看来?所以这是一个控制器方法?
  • 是的 - getProducts() 是一个控制器方法。

标签: spring-webflux project-reactor


【解决方案1】:

假设getProducts 是一个控制器方法,并且您想在模型中添加这些产品以在视图模板中呈现,您可以这样解决这个问题:

@GetMapping("/products")
public String getProducts(Model model) {

    Flux<Product> products = this.gateway.getProductIds()
            .flatMap(productId -> this.getProduct(productId.getID()));
    model.put("products", products);
    // return the view name
    return "showProducts";
}

【讨论】:

    猜你喜欢
    • 2021-07-11
    • 2020-09-16
    • 2017-07-30
    • 2020-06-08
    • 1970-01-01
    • 2017-06-20
    • 1970-01-01
    • 2013-08-22
    • 2019-12-30
    相关资源
    最近更新 更多