【问题标题】:How to handle exception in completableFuture in the for loop如何在for循环中处理completableFuture中的异常
【发布时间】:2021-07-03 17:16:54
【问题描述】:

我是 CompletableFuture 的新手,我想要做的是我在 Spring Boot 项目中有这个逻辑,我正在尝试使用 CompletableFuture 将其转换为并行处理方法。

@Transaction
void serviceMethod{
    for(Object obj : objList) //objlist can have 10000 objects and its a multi level composite objects 
    {
        //Get corresponding entity obj from the database, if not found throw user exception 
        //Process i.e. change some fields 
        

    }
}

在上述逻辑中,该方法使用 @Transaction 注释,我没有显式调用 JPA save 来保存实体。

现在,我正在尝试使用上述逻辑进行并行处理。

@Transaction
void serviceMethod{
    for(Object obj : objList) //objlist can have 10000 objects and its a multi level composite objects 
    {
    
        //Get corresponding entity obj from the database, if not found throw user exception 
        CompletableFuture<Optional<Obj>> optionalObjFuture = CompletableFuture.supplyAsync( () -> {//get the object from repository})
        
        
        CompletableFuture<Obj> objFuture = optionalObjFuture.thenApply( optionalObj -> {
                            if(obj.isPresent()){
                                return obj.get();
                            }
                            else{
                                throw user exception;
                            }
                        })
                        
        ////Process i.e. change some fields 

    }
}

现在的问题是

  1. 当出现异常时,我必须遵循什么方法来中断 for 循环?
  2. 在这种情况下如何处理事务。有没有什么方法可以处理事务而不需要对存储在数据结构中的已处理对象调用 saveAll?

【问题讨论】:

    标签: exception spring-transactions spring-boot-actuator completable-future


    【解决方案1】:

    对于 (1),您可以将这些 10_000 调用拆分为更小的批次。例如:

    public static void main(String[] args) throws Exception {
    
        int innerIterations = 5;
        ExecutorService service = Executors.newFixedThreadPool(innerIterations);
    
        for(int i=0;i<10;++i) {
    
            List<CompletableFuture<String>> list = new ArrayList<>();
            int j = 0;
            for(;j<innerIterations;++j) {
                CompletableFuture<Optional<String>> cf = CompletableFuture.supplyAsync(() -> Optional.of("a"), service);
    
    
                CompletableFuture<String> cf2 = cf.thenApplyAsync(x -> {
                    if(x.isPresent()) {
                        return x.get();
                    } else {
                        throw new RuntimeException("test");
                    }
                }, Executors.newFixedThreadPool(2));
    
                list.add(cf2);
            }
    
            i += j;
    
            CompletableFuture<Void> all5 = CompletableFuture.allOf(list.toArray(new CompletableFuture[0]));
            if(all5.isCompletedExceptionally()) {
                // log exception, whatever
                break;
            } else {
                List<String> set = list.stream().map(CompletableFuture::join).collect(Collectors.toList());
                System.out.println(set);
            }
    
        }
    
    }
    

    这做了几件事:

    • 将初始10拆分成两批5
    • 就这 5 个期货致电 CompletableFuture::allOf。阅读文档,看看如果至少有一个未来从那些 5 中失败,那么 all5 也失败了。
    • 如果all5 没有失败(即all5.isCompletedExceptionally() 没有通过),则调用CompletableFuture::join 以获取所有结果。真的不会有“加入”,因为之前的allOff 已经在等待它们完成了。

    对于第二个问题 - 你不能。您将需要手动创建事务,但 Spring 使它变得相当容易。

    【讨论】:

    • 感谢您的指导。这肯定会对我有所帮助。
    • @Vibha 如果这有任何帮助,您可以接受答案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多