【问题标题】:CompletableFuture to execute multiple DB queries asynchronouslyCompletableFuture 异步执行多个数据库查询
【发布时间】:2019-11-14 20:35:30
【问题描述】:

我想并行执行多个数据库查询并将结果存储在地图中。我正在尝试这样做,但是当我访问地图时,地图并没有完全填充。

我做错了什么吗?

 public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {

         Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x));

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x));

         CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp)).
                    thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x));

         return instrumentsEdgesMap;

}

任何帮助将不胜感激,在此先感谢。

【问题讨论】:

    标签: java multithreading java-8 parallel-processing completable-future


    【解决方案1】:

    在上述方法中,supplyAsync 将由来自ForkJoinPoolAsync 线程执行,但thenApply 方法总是通过调用线程执行。因此,您的查询将按顺序一个接一个地运行,它不是异步的

    所有没有显式 Executor 参数的异步方法都使用 ForkJoinPool.commonPool() 执行(除非它不支持至少两个并行级别,在这种情况下,会创建一个新线程来运行每个任务)。

    这是一个例子

    CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName());
            return "SupplyAsync";
        }).thenAccept(i->{
        System.out.println(Thread.currentThread().getName()+"--"+i);
        });
    

    输出:

    ForkJoinPool.commonPool-worker-3
    main--SupplyAsync
    

    因此,如果您希望您的进程为Async,那么首先使用supplyAsync 触发所有三个数据库查询,并在CompletableFuture 中捕获输出

    CompletableFuture<Set<String>> first =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp));
    
    CompletableFuture<Set<String>> second =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp));
    
    CompletableFuture<Set<String>> third =  CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp));
    

    然后现在用其中三个创建一个流,然后将它们收集到Map

    Stream.of(new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.ABC, first),
                  new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.XYZ, second),
                  new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.DEF, third))
           .forEach(entry->{
               entry.getValue().thenAccept(val-> instrumentsEdgesMap.put(entry.getKey(), val));
           });
    

    【讨论】:

    • 感谢您的解决方案。
    【解决方案2】:

    您必须等待期货完成后才能返回结果。

    试试类似的东西

        public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {
    
            Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);
    
            CompletableFuture.allOf(
                CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp))
                    .thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x)),
    
                CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp))
                    .thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x)),
    
                CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp))
                    .thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x)))
            .get(); // wait for completion of all three subtasks
    
            return instrumentsEdgesMap;
        }
    
    

    【讨论】:

    • 如果这是代码,我们不应该使用join而不是get吗?如果使用 get,则必须显式处理两个已检查的异常。 v get() 抛出InterruptedException、ExecutionException;
    • 你是对的。根据您的要求和用例,join 可能是更好的选择。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-25
    • 2017-12-17
    • 2021-07-28
    • 1970-01-01
    • 2021-10-12
    • 2020-06-07
    相关资源
    最近更新 更多