【问题标题】:How to divide 1 completablefuture to many completablefuture in stream?如何在流中将 1 个 completablefuture 划分为多个 completablefuture?
【发布时间】:2016-09-30 13:06:47
【问题描述】:

例如我有这样的方法:

public CompletableFuture<Page> getPage(int i) {
    ...
}
public CompletableFuture<Document> getDocument(int i) {
    ...
}
public CompletableFuture<Void> parseLinks(Document doc) {
    ...
}

还有我的流程:

List<CompletableFuture> list = IntStream
    .range(0, 10)
    .mapToObj(i -> getPage(i))

    // I want method like this:
    .thenApplyAndSplit(CompletableFuture<Page> page -> {
        List<CompletableFuture<Document>> docs = page.getDocsId()
            .stream()
            .map(i -> getDocument(i))
            .collect(Collectors.toList());
        return docs;
    })
    .map(CompletableFuture<Document> future -> {
        return future.thenApply(Document doc -> parseLink(doc);
    })
    .collect(Collectors.toList());

应该是 flatMap() 之类的 CompletableFuture,所以我想实现这个流程:

List<Integer> -> Stream<CompletableFuture<Page>>
              -> Stream<CompletableFuture<Document>>
              -> parse each

更新

Stream<CompletableFuture<Page>> pagesCFS = IntStream
        .range(0, 10)
        .mapToObj(i -> getPage(i));

Stream<CompletableFuture<Document>> documentCFS = listCFS.flatMap(page -> {
    // How to return stream of Document when page finishes?
    // page.thenApply( ... )
})

【问题讨论】:

  • 基本上你需要将 CompletableFuture 平面映射到 Stream

标签: java java-8 java-stream completable-future


【解决方案1】:

我还想尝试为Stream&lt;CompletableFuture&gt; 实现Spliterator,所以这是我的尝试。

只要这些期货中的任何一个完成,此解决方案就会创建期货结果的Stream。当然,这会丢失原始流中存在的任何排序。

原始流将被该方法立即消耗,从而触发所有元素的整个管道 - 从而失去原始流管道的惰性。假设构建流很快,由期货自己完成的艰苦工作,因此消耗流不应该是昂贵的。这也确保所有任务都已被触发,因为它强制处理源流。

所以这里是实现:

public static <T> Stream<T> flattenStreamOfFutures(Stream<CompletableFuture<? extends T>> stream, boolean parallel) {
    return StreamSupport.stream(new CompletableFutureSpliterator<T>(stream), parallel);
}

public static <T> Stream<T> flattenStreamOfFuturesOfStream(Stream<CompletableFuture<? extends Stream<T>>> stream,
                                                           boolean parallel) {
    return flattenStreamOfFutures(stream, parallel).flatMap(Function.identity());
}

public static class CompletableFutureSpliterator<T> implements Spliterator<T> {
    private List<CompletableFuture<? extends T>> futures;

    CompletableFutureSpliterator(Stream<CompletableFuture<? extends T>> stream) {
        futures = stream.collect(Collectors.toList());
    }

    CompletableFutureSpliterator(CompletableFuture<T>[] futures) {
        this.futures = new ArrayList<>(Arrays.asList(futures));
    }

    CompletableFutureSpliterator(final List<CompletableFuture<? extends T>> futures) {
        this.futures = new ArrayList<>(futures);
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        if (futures.isEmpty())
            return false;
        CompletableFuture.anyOf(futures.stream().toArray(CompletableFuture[]::new)).join();
        // now at least one of the futures has finished, get its value and remove it
        ListIterator<CompletableFuture<? extends T>> it = futures.listIterator(futures.size());
        while (it.hasPrevious()) {
            final CompletableFuture<? extends T> future = it.previous();
            if (future.isDone()) {
                it.remove();
                action.accept(future.join());
                return true;
            }
        }
        throw new IllegalStateException("Should not reach here");
    }

    @Override
    public Spliterator<T> trySplit() {
        if (futures.size() > 1) {
            int middle = futures.size() >>> 1;
            // relies on the constructor copying the list, as it gets modified in place
            Spliterator<T> result = new CompletableFutureSpliterator<>(futures.subList(0, middle));
            futures = futures.subList(middle, futures.size());
            return result;
        }
        return null;
    }

    @Override
    public long estimateSize() {
        return futures.size();
    }

    @Override
    public int characteristics() {
        return IMMUTABLE | SIZED | SUBSIZED;
    }
}

它的工作原理是将给定的Stream&lt;CompletableFuture&lt;T&gt;&gt; 转换为这些期货的List

为了生成输出流,它只是在流式传输其值之前等待任何未来完成。

一个简单的非并行使用示例(执行器用于CompletableFutures,以便同时启动它们):

ExecutorService executor = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
flattenStreamOfFutures(IntStream.range(0, 20)
        .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((i % 10) * 1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            System.out.println("Finished " + i + " @ " + (System.currentTimeMillis() - start) + "ms");
            return i;
        }, executor)), false)
        .forEach(x -> {
            System.out.println(Thread.currentThread().getName() + " @ " + (System.currentTimeMillis() - start) + "ms handle result: " + x);
        });
executor.shutdown();

输出:

Finished 10 @ 103ms
Finished 0 @ 105ms
main @ 114ms handle result: 10
main @ 114ms handle result: 0
Finished 1 @ 1102ms
main @ 1102ms handle result: 1
Finished 11 @ 1104ms
main @ 1104ms handle result: 11
Finished 2 @ 2102ms
main @ 2102ms handle result: 2
Finished 12 @ 2104ms
main @ 2105ms handle result: 12
Finished 3 @ 3102ms
main @ 3102ms handle result: 3
Finished 13 @ 3104ms
main @ 3105ms handle result: 13
…

如您所见,流几乎立即生成值,即使期货未按顺序完成。

将其应用于问题中的示例,这将给出(假设 parseLinks() 返回 CompletableFuture&lt;String&gt; 而不是 ~&lt;Void&gt;):

flattenStreamOfFuturesOfStream(IntStream.range(0, 10)
                .mapToObj(this::getPage)
                // the next map() will give a Stream<CompletableFuture<Stream<String>>>
                // hence the need for flattenStreamOfFuturesOfStream()
                .map(pcf -> pcf
                        .thenApply(page -> flattenStreamOfFutures(page
                                        .getDocsId()
                                        .stream()
                                        .map(this::getDocument)
                                        .map(docCF -> docCF.thenCompose(this::parseLinks)),
                                false))),
        false)
.forEach(System.out::println);

请注意,如果您在并行模式下使用它,请注意为流和在CompletableFuture 后面运行的任务使用不同的ForkJoinPool .流将等待期货完成,因此如果它们共享同一个执行程序,您实际上可能会降低性能,甚至陷入死锁。 - 编辑:我认为这是不正确的。 ForkJoinPool 应该能够处理任何阻塞的线程,并相应增加线程数。

【讨论】:

  • 哇!看起来很完美,我一定会喜欢你的代码。当你需要使用这个的时候,你遇到过真实的情况吗?
  • 不,我只是喜欢这个问题的挑战。它显然需要更多的测试。
  • 谢谢,过几天我会测试你的代码,然后说一下我会得到什么结论
【解决方案2】:

您真的必须使用 Streams 吗?你不能把一些依赖的动作放到你的CompletableFutures 上吗?尤其是你最后一次调用返回CompletableFutures&lt;Void&gt;(当然,也可以使用Collection.forEach

List<CompletableFuture<Page>> completableFutures = IntStream
      .range(0, 10)
      .mapToObj(i -> getPage(i)).collect(Collectors.toList());

for (CompletableFuture<Page> page : completableFutures) {
    page.thenAccept(p -> {
        List<Integer> docsId = p.getDocsId();
        for (Integer integer : docsId) {
            getDocument(integer).thenAccept(d-> parseLinks(d));
        }
    });
}

编辑:好吧,我又做了一次尝试,但我不确定这是否是个好主意,因为我不是 CompletableFuture 的专家。

使用以下方法(或许有更好的实现方式):

public static <T> CompletableFuture<Stream<T>> flatMapCF(Stream<CompletableFuture<T>> stream){
    return CompletableFuture.supplyAsync( ()->
        stream.map(CompletableFuture::join)
    );
}


Stream<CompletableFuture<Page>> pagesCFS = IntStream
        .range(0, 10)
        .mapToObj(i -> getPage(i));

CompletableFuture<Stream<Page>> pageCF = flatMapCF(pagesCFS);

CompletableFuture<Stream<Document>> docCF= 
   pageCF.thenCompose(a ->
        flatMapCF(a.flatMap(
                b -> b.getDocsId()
                        .stream()
                        .map(c -> getDocument(c))
        )));

问题可能在于,CompletableFuture 仅在所有结果可用时才返回

【讨论】:

  • 文档页数不知道的问题。 Future 完成后我可以知道这一点
  • @mystdeim: 但是p.getDocsId() 应该在其各自的CompletableFuture&lt;Page&gt; page 完成后调用,这对你来说不合适吗?
  • 没关系,但我想在最后得到一个文档流。也许 jdk 8 不可能,我应该使用 RxJava,因为那里存在 flatMap 运算符
  • @mystdeim:我添加了一种可能的方法,但这可能需要一些审查。
  • 谢谢你明白我需要什么我喜欢你的回答。这个方法可以满足我的需要,但它不是完全异步的。我用谷歌搜索了一下,发现来自 java 9 的 Flow 应该对我有帮助。 CompletableFuture 不是为在流中工作而创建的(
【解决方案3】:

如果您不关心操作何时完成,那么以下将简单地触发所有文档上的parseLinks()

IntStream.range(0, 10)
        .mapToObj(this::getPage)
        .forEach(pcf -> pcf
                .thenAccept(page -> page
                        .getDocsId()
                        .stream()
                        .map(this::getDocument)
                        .forEach(docCF -> docCF.thenCompose(this::parseLinks))));

否则,由于您的最后一个操作返回CompletableFuture&lt;Void&gt;,我假设您主要想知道何时完成所有操作。你可以这样做:

CompletableFuture<Void> result = CompletableFuture.allOf(IntStream.range(0, 10)
        .mapToObj(this::getPage)
        .map(pcf -> pcf
                .thenCompose(page -> CompletableFuture.allOf(page
                        .getDocsId()
                        .stream()
                        .map(docId -> getDocument(docId)
                                .thenCompose(this::parseLinks))
                        .toArray(CompletableFuture[]::new))))
        .toArray(CompletableFuture[]::new));

如果您对单个 CompletableFutures 的结果感兴趣,最好的办法可能是直接在流中,在它们被创建的地方处理它们。

您甚至可以将这一切包装在一个可重用的方法中。例如,如果parseLinks() 返回一个CompletableFuture&lt;List&lt;String&gt;&gt;,您可以定义这样的方法:

public CompletableFuture<Void> processLinks(Function<? super CompletableFuture<List<String>>, ? extends CompletableFuture<?>> processor) {
    return CompletableFuture.allOf(IntStream.range(0, 10)
            .mapToObj(this::getPage)
            .map(pcf -> pcf
                    .thenCompose(page -> CompletableFuture.allOf(page
                            .getDocsId()
                            .stream()
                            .map(docId -> getDocument(docId)
                                    .thenCompose(this::parseLinks))
                            .map(processor) // here we apply the received function
                            .toArray(CompletableFuture[]::new))))
            .toArray(CompletableFuture[]::new));
}

并像这样处理结果列表:

processLinks(linksCF -> linksCF
        .thenAccept(links -> links.forEach(System.out::println)));

打印完所有链接后,返回的CompletableFuture 将完成。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-10-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-12
    • 1970-01-01
    相关资源
    最近更新 更多