【问题标题】:CompletableFuture - Run multiple rest calls in parallel and get different resultCompletableFuture - 并行运行多个休息调用并获得不同的结果
【发布时间】:2020-07-10 17:39:51
【问题描述】:

我有一个相当普遍或独特的要求。例如,我有以下AccountDetails 列表:

List<AccountDetails>

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;   
}

bankAccountId 之外的所有上述字段都是从外部 REST 服务调用中提取的。 我想并行调用所有 REST 服务并更新列表中的每个对象:

所以,它看起来像下面这样:

对于每个accountDetails

  • 调用抵押REST服务并更新martgageAccountIdfield(REST返回MortgageInfo对象)
  • 调用事务REST服务并更新noOfTrans字段(REST返回Transactions对象)
  • 调用地址REST服务并更新addressLine字段(REST返回Address对象)
  • 调用链接 REST 服务并更新 externalLink 字段。 (REST 返回Links 对象)

我希望所有上述调用并行进行,并针对列表中的每个 AcccountDetails 对象。 如果有异常,我想优雅地处理它。请注意,上述每个 REST 服务都返回不同的自定义对象

我对如何使用CompletableFuture 链接来实现这一点感到困惑。 不确定allOfthenCombine(只需要两个)或thenCompose 应该使用以及如何将所有这些放在一起。

有什么例子/想法吗?

【问题讨论】:

  • 我会尽量避免的一件事是为每个对象调用端点。如果您的后端服务有批量请求,您应该遵从这些请求。因此,您将收集所有 ID,然后向后端服务发出一个请求。
  • @SamOrozco 你是对的。但是调用被缓存了。所以单个调用比批量调用更好,这就是循环单个记录的原因

标签: java multithreading spring-boot java-8 completable-future


【解决方案1】:
AccountDetails accountDetails = new AccountDetails();

CompletableFuture.allOf(
                        CompletableFuture.
                                supplyAsync(() -> //CALL MORTAGE INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setMortgageAccountId(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setNoOfTrans(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setAddressLine(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setExternalLink(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                ).join();

【讨论】:

  • handle() 是什么,里面有什么?另外,我有AccountDetails 的列表,所以我会将上面的所有内容放在一个循环Stream 中。所以这意味着,每个循环运行一个新线程,并且在每个线程下,大约有 4 个新线程运行?
  • handle() -> 返回一个新的 CompletionStage,当此阶段正常或异常完成时,将使用此阶段的结果和异常作为提供函数的参数执行。 docs.oracle.com/javase/8/docs/api/java/util/concurrent/… 是的,您可以为帐户详细信息列表运行循环并使用上面给出的可完成未来填充它,每个循环运行不一定会在不同的线程上运行。
  • 每当您使用 CompletableFuture.supplyAsync 或 CompletableFuture.supplyAsync 时,您总是希望提供自己的执行程序。使用默认的 forkjoin 池没有太多可用线程,并且几乎总是会导致调用备份。
  • allOf() 末尾的join() 与每个supplyAsyc() 的区别是什么?
【解决方案2】:

如果我简单地将您的帐户类称为:

class Account {
  String fieldA;
  String fieldB;
  String fieldC;

  Account(String fieldA, String fieldB, String fieldC) {
    this.fieldA = fieldA;
    this.fieldB = fieldB;
    this.fieldC = fieldC;
  }
}

然后您可以使用CompletableFuture#allOf(...) 等待所有可完成未来的结果,每个字段更新一个,然后分别从这些期货中检索结果。我们不能使用allOf 的结果,因为它什么都不返回(void)。

Account account = CompletableFuture.allOf(cfA, cfB, cfC)
    .thenApply(ignored -> {
      String a = cfA.join();
      String b = cfB.join();
      String c = cfC.join();
      return new Account(a, b, c);
    }).join(); // or get(...) with timeout

我们可以在thenApply 中使用join,因为所有可完成的期货都在这个阶段完成。您可以修改上面的代码块以适应您的逻辑,例如更新字段而不是创建新对象。请注意,当可完成的未来异常完成时,上面的join() 可能会引发异常。您可以在将未来提交给allOf(...)之前将您的可完成未来正确更改为handle(),或者在使用join()之前询问是否isCompletedExceptionally()

CompletableFuture.allOf(cfA, cfB, cfC)
    .thenRun(() -> {
      if (!cfA.isCompletedExceptionally()) {
        account.fieldA = cfA.join();
      }
      if (!cfB.isCompletedExceptionally()) {
        account.fieldB = cfB.join();
      }
      if (!cfC.isCompletedExceptionally()) {
        account.fieldC = cfC.join();
      }
    }).join(); // or get(...) with timeout

在一个完成阶段更新字段的好处是这些操作在同一个线程中完成,因此您不必担心并发修改。

【讨论】:

  • 帮我理解。 join() 不是阻塞操作吗?
  • @KevinRave,allOf() 在所有给定的 CompletableFutures 完成时完成,因此allOf() 正在阻止操作,join() 仅检索结果。
  • join() 末尾的allOf() 与每个supplyAsyc() 之间的区别是什么,例如answer below
  • @KevinRave,正如你所说,join() 是一个阻塞操作。所以我们应该尽可能避免使用join(),以使整个逻辑不阻塞。如果我们在每个supplyAsync() 之后执行join(),这些异步逻辑将变为同步并阻塞当前线程,直到它们执行完成(成功或异常)。另一方面,在allOf() 末尾使用join() 减少了阻塞操作的次数,即阻塞当前线程一次而不是N 次。
【解决方案3】:

既然你已经标记了spring-boot,我想你会使用它并且你的服务是用spring框架编写的。那么我提供了一个与spring框架相关的答案。

首先,我创建了一个接口,用于将 REST API 实现为异步。

public interface AsyncRestCall<T> {
   /** this is a hypothetical method with hypothetical params!*/
   CompletableFuture<T> call(String bankAccountId); 
   String type();
}

那么你可以为你的服务实现这样的:

正如您在此实现中看到的,我使用了MortgageRest,它代表Mortgage 的休息服务。

 @Service
 public class MortgageService implements AsyncRestCall<MortgageInfo> {

   private final MortgageRest mortgageRest;

   @Autowired
   public MortgageService(MortgageRest mortgageRest) {
       this.mortgageRest = mortgageRest;
   }

   @Override
   public CompletableFuture<MortgageInfo> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(() -> mortgageRest.service(bankAccountId));
    }

   @Override
   public String type() {
      return "mortgage";
   } 
} 

抵押贷款:

@Service
public class MortgageRest {
  private RestTemplate restTemplate;
  public MortgageRest(RestTemplate restTemplate) {
     this.restTemplate = restTemplate;
  }
  public MortgageInfo service(String bankAccountId) {
     return new MortgageInfo("123455" + bankAccountId);
  }
}

对于其他休息服务这样做。

@Service
public class TransactionService implements AsyncRestCall<Transactions> {

   private final TransactionRest transactionRest;

   public TransactionService(TransactionRest transactionRest) {
      this.transactionRest = transactionRest;
   } 

   @Override
   public CompletableFuture<Transactions> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(transactionRest::service);
   }

   @Override
   public String type() {
       return "transactions";
   } 
} 

TransactionRest:

 @Service
 public class TransactionRest {

   public Transactions service() {
       return new Transactions(12);
   }
 }

现在您需要访问所有 AsyncRestCall 实现。对于这个porpuse,你可以声明一个类似这样的类:

@Service
public class RestCallHolder {

  private final List<AsyncRestCall> asyncRestCalls;

  public RestCallHolder(List<AsyncRestCall> asyncRestCalls) {
      this.asyncRestCalls = asyncRestCalls;
  }

  public List<AsyncRestCall> getAsyncRestCalls() {
      return asyncRestCalls;
  }
}

AccountDetailService(你可以说出你喜欢的东西)使用CompleteableFuture来并行调用rest服务。

在此服务中,每个bankAccountId 休息调用将存储在Map&lt;String, Map&lt;String, Object&gt;&gt; result = new HashMap&lt;&gt;(); 中,外部映射键将bankAccountId 值作为键存储,其值是休息服务调用,它们将存储在映射(内部映射)中。键是类型,值是休息呼叫响应。最后通过循环 accountDetails 将更新其属性。

@Service
public class AccountDetailService {

  private final RestCallHolder restCallHolder;

  public AccountDetailService(RestCallHolder restCallHolder) {
      this.restCallHolder = restCallHolder;
  }

  public List<AccountDetail> update(List<AccountDetail> accountDetails) {
     Map<String, Map<String, Object>> result = new HashMap<>();
     List<AccountDetail> finalAccountDetails = new ArrayList<>();

     accountDetails.forEach(accountDetail -> {
          List<CompletableFuture> futures = restCallHolder.getAsyncRestCalls()
                    .stream()
                    .map(rest -> rest.call(accountDetail.getBankAccountId()))
                    .collect(Collectors.toList());

     CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
                 .thenAccept(aVoid -> { 
                    Map<String, Object> res = restCallHolder.getAsyncRestCalls()
                              .stream()
                              .map(rest -> new AbstractMap.SimpleEntry<>(rest.type(),
                                  rest.call(accountDetail.getBankAccountId()).join()))
                              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                           result.put(accountDetail.getBankAccountId(), res);
                      }
                   ).handle((aVoid, throwable) -> {
                      return null; // handle the exception here 
             }).join();
            }
    );

      accountDetails.forEach(accountDetail -> finalAccountDetails.add(AccountDetail.builder()
             .bankAccountId(accountDetail.getBankAccountId())
             .mortgageAccountId(((MortgageInfo) result.get(accountDetail.getBankAccountId()).get("mortgage")).getMortgageAccountId())
             .noOfTrans(((Transactions) result.get(accountDetail.getBankAccountId()).get("transactions")).getNoOfTrans())
             .build()));
     return finalAccountDetails;
   }
 }

【讨论】:

  • 无论何时使用CompletableFuture.supplyAsyncCompletableFuture.supplyAsync,您总是希望提供自己的执行程序。使用默认的 forkjoin 池没有太多可用线程,并且几乎总是会导致调用备份。
【解决方案4】:

我将负责将字段值获取到模型对象本身。
以下是三种替代解决方案,使用并行流、流和执行器,以及 for 循环和执行器。

解决方案 1:

accounts.parallelStream()
        .<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
                account::updateAddressLine, account::updateExternalLink))
        .map(RestRequest::new)
        .forEach(RestRequest::run);

解决方案 2:

Executor executor = Executors.newFixedThreadPool(PARALLELISM);
accounts.stream()
        .<Runnable>flatMap(account -> Stream.of(account::updateMortgage, account::updateNoOfTrans,
                account::updateAddressLine, account::updateExternalLink))
        .map(RestRequest::new)
        .forEach(executor::execute);

解决方案 3:

Executor executor = Executors.newFixedThreadPool(PARALLELISM);
for (AccountDetails account : accounts) {
    execute(executor, account::updateMortgage);
    execute(executor, account::updateNoOfTrans);
    execute(executor, account::updateAddressLine);
    execute(executor, account::updateExternalLink);
}

private static void execute(Executor executor, Runnable task) {
    executor.execute(new RestRequest(task));
}

常用代码:

class RestRequest implements Runnable {
    private final Runnable task;

    RestRequest(Runnable task) {
        this.task = task;
    }

    @Override
    public void run() {
        try {
            task.run();
        } catch (Exception e) {
            // A request failed. Others will not be canceled.
        }
    }
}

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;

    void fetchMortgage() {
        mortgageAccountId = MortgageService.getMortgage(bankAccountId).getAccountId();
    }

    void fetchNoOfTrans() {
        noOfTrans = TransactionService.getTransactions(bankAccountId).getAmount();
    }

    void fetchAddressLine() {
        addressLine = AddressService.getAddress(bankAccountId).getLine();
    }

    void fetchExternalLink() {
        externalLink = LinkService.getLinks(bankAccountId).getExternal();
    }
}

【讨论】:

  • 我不同意获取外部放置模型的责任。
  • 与下面的the below answer相比,它的效率和性能如何
猜你喜欢
  • 1970-01-01
  • 2021-06-28
  • 2022-12-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多