【问题标题】:How to run multiple service calls in parallel using CompletableFuture?如何使用 CompletableFuture 并行运行多个服务调用?
【发布时间】:2022-01-13 12:54:05
【问题描述】:

我正在向用户返回以下响应

class FinalResponseDTO {
    List<Service1ResponseDTO> service1ResponseDTO;
    Long totalCount;
    List<Service2ResponseDTO> service2ResponseDTO;
}

到目前为止,我正在进行三个连续调用来计算这个FinalResponseDTO,每个调用都可以独立于其他调用运行。我尝试将三个不同的CompletableFuture 设为:

CompletableFuture<List<Service1ResponseDTO> future1 = CompletableFuture.supplyAsync(() -> service1.callMethod1());
CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> service2.callMethod2());
CompletableFuture<Service2ResponseDTO> future3 = CompletableFuture.supplyAsync(() -> service3.callMethod3());

如果我拨打CompletableFuture.allOf(future1, future2, future3).join(); 还是应该拨打CompletableFuture.allOf(future1, future2, future3).get();?即使我调用这些joinget 中的任何一个,那么我应该如何从中构造FinalResponseDTO。我是CompletableFuture 等 Java 8 并发功能的新手,我很困惑,因为每个未来的每个返回类型都不同,我应该如何获得所有这些未来的组合响应,然后构造我的最终输出?

【问题讨论】:

    标签: java multithreading asynchronous completable-future


    【解决方案1】:

    来自CompletableFuture.allOf()的Javadocs:

    当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture。如果任何给定的 CompletableFuture 异常完成,则返回的 CompletableFuture 也会这样做,并且 CompletionException 将此异常作为其原因。否则,给定 CompletableFuture 的结果(如果有)不会反映在返回的 CompletableFuture 中,但可以通过单独检查它们来获得

    所以当组合 CompletableFuture 完成时,您可以通过应用构造对象的函数来检查值并使用简单的构造函数构造最终响应对象:

    CompletableFuture.allOf(future1, future2, future3).thenApply(v -> 
        new FinalResponseDTO(future1.join(), future2.join(), future3.join())
    );
    

    【讨论】:

      【解决方案2】:
          CompletableFuture<List<Service1ResponseDTO> future1 = 
              CompletableFuture.supplyAsync(() -> service1.callMethod1());
          CompletableFuture<Long> future2 = 
              CompletableFuture.supplyAsync(() -> service2.callMethod2());
          CompletableFuture<List<Service2ResponseDTO>> future3 = 
              CompletableFuture.supplyAsync(() -> service3.callMethod3());
      
          CompletableFuture.allOf(future1, future2, future3).get();
      
          return new FinalResponseDTO(future1.join(), future2.join(), future3.join());
      

      注意:supplyAsyncForkJoinPool.commonPool() 上运行,因此提供您自己的 executor 是一个不错的选择,例如Executors.newCachedThreadPool():

      CompletableFuture.supplyAsync(() -> action, executor);
      

      【讨论】:

      • 这会阻止调用者。将您的 join() 语句放入 thenApply 可以让您保留“管道”。
      【解决方案3】:

      您可以回退到 CompletableFuture 的有些隐藏的应用程序,而不是使用 CompletableFuture.allOf

      static <T, R> CompletableFuture<R> alsoApply(CompletableFuture<T> future,     CompletableFuture<Function<T, R>> f) {
          return f.thenCompose(future::thenApply);
      }
      

      使用这个辅助函数,您可以在并行线程中执行期货:

      CompletableFuture<String> future = alsoApply(
          CompletableFuture.supplyAsync(() -> "a"),
          CompletableFuture.supplyAsync(() -> "b")
      .thenApply(b -> a -> a + b));
      
      assertEquals("ab", future.get());
      

      请参阅 this question and answer 了解它的来源以及它的工作原理和方式。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-10-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-06-28
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多