【问题标题】:Get all results from Recursive CompletableFuture从 Recursive CompletableFuture 获取所有结果
【发布时间】:2022-01-07 09:54:25
【问题描述】:

场景如下:它可能会随机生成一些数据,如果是,则需要递归检索数据,最后我需要获取所有生成的数据。

interface DataProvider {
  List<String> randomData(String url);
}

public static void main(String[] args) {
    List<String> strings = fetch(Executors.newFixedThreadPool(4), new DataProvider() {
        final Random random = new Random();
        @Override
        public List<String> randomData(String url) {
            if (random.nextBoolean()) {
                System.out.println("provide some data");
                return List.of(UUID.randomUUID().toString());
            }
            return null;
        }
    }, List.of("a", "b", "c"));
    System.out.println("results are: ");
    System.out.println(strings);
}

private static List<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
    if (items == null || items.isEmpty())
        return new ArrayList<>();
    List<CompletableFuture<List<String>>> collect =
            items.stream()
                    .map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
                    .collect(Collectors.toList());
    List<CompletableFuture<List<String>>> list = new ArrayList<>();
    collect.forEach(item -> {
        CompletableFuture<List<String>> listCompletableFuture = item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es);
        list.add(listCompletableFuture);
    });
    return list.stream().flatMap(item -> item.join().stream()).collect(Collectors.toList());
}

有时程序会冻结,有时会打印一个空集合。 (provide some data 已打印)。

我哪里做错了?我对CompletableFuture 一点也不熟悉,所以可能整个递归调用都是错误的。 (或者代码可以更简单,因为CompletableFuture有很多方法)。

【问题讨论】:

    标签: java multithreading recursion completable-future


    【解决方案1】:

    您的代码确实存在一些问题:

    逻辑问题:结果只能是空列表!

    fetch() 的结果将是:

    • 如果items 为空,则为空列表
    • items 列表中的每个项目上递归应用fetch(randomData(item)) 的结果 由于没有返回非空列表的递归叶,并且父调用也不向列表添加任何内容,因此它只能返回一个空结果。

    也许您也想在结果中包含randomData()

    技术问题:在固定线程池中使用CompletableFuture.join()

    您正在使用Executors.newFixedThreadPool(4)。顾名思义,线程的数量是固定的,所以当所有线程都用完时,新的任务就会排队。

    问题是您正在使用join() 来等待其中一些新任务。所以一方面你阻塞了一个线程,另一方面你在等待你刚刚放入队列的东西。

    由于是递归的,如果达到深度4,就会死锁。

    防止这种情况的最简单方法是使用ForkJoinPool。当其中一个线程被阻塞时,这种池将产生新线程。

    附带说明,ForkJoinPool 使用守护线程,因此它们不会阻止 JVM 终止。如果您想使用固定线程池,您必须在其上调用 shutdown() 以确保它允许终止 - 或将其配置为使用守护线程。

    其他可能的改进

    您的代码正在构建大量中间集合并阻塞大量join() 调用。 CompletableFuture 旨在链接相关的计算阶段,这要归功于它对CompletionStage 的实现,而不仅仅是将其用作可以完成的Future

    在不改变你当前的逻辑的情况下,首先你可以让fetch()返回一个Stream,这会删除很多collect()/stream()调用:

    private static Stream<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
        if (items == null || items.isEmpty())
            return Stream.empty();
        List<CompletableFuture<Stream<String>>> list =
                items.stream()
                        .map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
                        .map(item -> item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es))
                        .collect(Collectors.toList());
        return list.stream().flatMap(CompletableFuture::join);
    }
    

    为了达到这个目的,我们做了以下改动:

    • forEach() / list.add() 替换为collect.stream(…).map(…).collect(toList())
    • 内联collect 变量
    • 删除生成的.collect(toList()).stream(),因为它是多余的
    • 将返回类型更改为Stream&lt;String&gt;,主要更改为: ** 将list 的类型更改为List&lt;CompletableFuture&lt;Stream&lt;String&gt;&gt;&gt;(因为内部Stream 来自第二个map(… -&gt; fetch()) 调用 ** 从 return 语句中删除 collect()stream()

    请注意,您需要保留中间 list 以确保在第一次调用 CompletableFuture::join 之前提供所有任务。

    这样更好,但调用仍然是同步的,需要ForkJoinPool 才能工作。既然fetch()里面的逻辑大部分都是异步的,那为什么不把整个方法做个异步呢?

    这主要需要将返回类型更改为CompletableFuture&lt;Stream&lt;String&gt;&gt;,并使用allOf()创建一个future,当所有中间future完成时才会完成:

    private static CompletableFuture<Stream<String>> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
        if (items == null || items.isEmpty())
            return CompletableFuture.completedFuture(Stream.empty());
        List<CompletableFuture<Stream<String>>> list =
                items.stream()
                        .map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
                        .map(item -> item.thenComposeAsync(strings -> fetch(es, dataProvider, strings), es))
                        .collect(Collectors.toList());
        return CompletableFuture.allOf(list.toArray(new CompletableFuture[0]))
                .thenApply(dummy -> list.stream().flatMap(CompletableFuture::join));
    }
    

    现在这是 100% 异步的,因为 join() 只会在已知已完成的期货上调用(感谢 allOf()),因此永远不会阻塞。

    它不再需要ForkJoinPool。事实上,它甚至可以在newFixedThreadPool(1) 上运行!好吧,只要你最后打电话给shutdown()就可以了……

    【讨论】:

    • 很好的答案!非常彻底。
    • 非常感谢你的回答,我可能需要消化一段时间,但你给我的代码似乎仍然打印出一个空集,你错过了什么吗?
    • 嗯,这是第一点:您的代码中存在一个逻辑问题,其中randomData() 从未保留。由于不清楚如何处理这些数据,我没有改变你的逻辑。因此,我提出的 2 项改进是完全相同的,希望您可以调整它们以返回您需要的内容。如果您根据要求调整您的问题,我可以尝试调整此答案。
    • 我要做的是假设初始数据是a、b、c,然后这三个数据是分别获取数据,假设a获取e,那么e需要递归获取数据,最后需要返回所有得到的数据集。如果它不复杂,如果您提供一些修复,我会很高兴!
    • 好的,但是你必须定义你想要的结果顺序。当我们实际浏览一棵树时,Tree traversal 是相关的。在这里,我们有一个任意树上的 DFS 算法,所以我猜你想要预购?所以如果以[a, b, c]开头,a返回[d, e],d返回[f],b返回[g],最终结果应该是[a, d, f, e, b, g , c]?
    猜你喜欢
    • 2015-07-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-31
    相关资源
    最近更新 更多