【问题标题】:How to convert List<T> to Flux<T> by using Reactor 3.x如何使用 Reactor 3.x 将 List<T> 转换为 Flux<T>
【发布时间】:2020-08-24 07:53:02
【问题描述】:

我有一个 Asyn 调用节俭接口:

public CompletableFuture<List<Long>> getFavourites(Long userId){
    CompletableFuture<List<Long>> future = new CompletableFuture();
    OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
    callback.addObserver(new OctoObserver() {
        @Override
        public void onSuccess(Object o) {
            future.complete((List<Long>) o);
        }

        @Override
        public void onFailure(Throwable throwable) {
            future.completeExceptionally(throwable);
        }
    });
    try {
        recommendAsyncService.getFavorites(userId, callback);
    } catch (TException e) {
        log.error("OctoCall RecommendAsyncService.getFavorites", e);
    }
    return future;
}

现在它返回一个 CompletableFuture。然后我用 Flux 调用它来做一些处理器。

public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
    // do not like it
    List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);

    System.out.println(recommendList);
    return Flux.fromIterable(recommendList)
            .flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
                    .userId(userId)
                    .productId(id)
                    .productType((int) (Math.random()*100))
                    .build())))
            .take(5)
            .publishOn(mdpScheduler);
}

但是,我想从 getFavourites 方法中获取 Flux,我可以在 getRecommend 方法中使用它。
或者,您可以推荐一个Flux API,我可以将List&lt;Long&gt; recommendList 转换为Flux&lt;Long&gt; recommendFlux

【问题讨论】:

    标签: java reactive-programming project-reactor


    【解决方案1】:

    要将CompletableFuture&lt;List&lt;T&gt;&gt; 转换为Flux&lt;T&gt;,您可以使用Mono#fromFutureMono#flatMapMany

    var future = new CompletableFuture<List<Long>>();
    future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
        CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
    
    Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);
    
    flux.subscribe(System.out::println);
    

    在回调中异步接收的List&lt;T&gt; 也可以在不使用CompletableFuture 的情况下转换为Flux&lt;T&gt;。 你可以直接使用Mono#createMono#flatMapMany

    Flux<Long> flux = Mono.<List<Long>>create(sink -> {
      Callback<List<Long>> callback = new Callback<List<Long>>() {
        @Override
        public void onResult(List<Long> list) {
          sink.success(list);
        }
    
        @Override
        public void onError(Exception e) {
          sink.error(e);
        }
      };
      client.call("query", callback);
    }).flatMapMany(Flux::fromIterable);
    
    flux.subscribe(System.out::println);
    

    或者简单地使用Flux#create一次通过多个发射:

    Flux<Long> flux = Flux.create(sink -> {
      Callback<List<Long>> callback = new Callback<List<Long>>() {
        @Override
        public void onResult(List<Long> list) {
          list.forEach(sink::next);
        }
    
        @Override
        public void onError(Exception e) {
          sink.error(e);
        }
      };
      client.call("query", callback);
    });
    
    flux.subscribe(System.out::println);
    

    【讨论】:

    • Int 直接使用 Mono#create 和 Mono#flatMapMany 的部分。我无法实现回调。我是否使用了错误的工具?或者你只是提供一个伪代码?
    • @dyy.alex, Callback 是你必须自己创建的类的接口。或者使用客户端库提供的库(client 变量后面的库)。
    猜你喜欢
    • 2014-02-05
    • 1970-01-01
    • 2019-04-13
    • 1970-01-01
    • 2010-11-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-29
    相关资源
    最近更新 更多