【问题标题】:How to transform CompletableFuture<Stream<T>> to Stream<CompletableFuture<T>>?如何将 CompletableFuture<Stream<T>> 转换为 Stream<CompletableFuture<T>>?
【发布时间】:2023-11-30 04:16:02
【问题描述】:

就是这样。真的找不到更接近的东西。我想实现以下任务链

List<Item> items = Collections.singletonList(new Item("John Smith", "1997-2014"));

Stream<CompletableFuture<List<ScrappingResult>>> scrappingFutures =
    items.stream().map(item ->
        CompletableFuture.supplyAsync(() -> new ScrappingTask(item).call()));

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
    scrappingFutures.map(resultsFuture -> ???);

Stream<CompletableFuture<TrimmingResult>> trimmingResults = scrappingFuturesUnwrapped.map(resultFuture ->
    // thenCompose?
    resultFuture.thenCompose(result -> {
        Path clipsDir = Paths.get("./"
            + result.getItem().getName()
            + "/" + result.getItem().getTimespan());

        AtomicInteger clipIdx = new AtomicInteger();

        return result.getVideo().getClips().stream()
            .map(clip -> CompletableFuture.supplyAsync(() ->
                new TrimmingTask(
                    ffmpegPath,
                    result.getVideo().getVideoUrl(),
                    clip,
                    clipsDir.resolve("clip_" + clipIdx.incrementAndGet() + ".mp3")).call())
            );
    });
);

最后一行在语法上不正确,但我希望能传达这个想法。所以,我想做两次 flatMap 之类的事情,最后得到 Stream<CompletableFuture<TrimmingResult>>

我该怎么做?

谢谢。

【问题讨论】:

  • 使用实际代码会使问题更清晰。
  • @assylias 添加了代码。
  • 您的代码与您在问题标题中写的不匹配。你的代码中没有CompletableFuture&lt;Stream&lt;T&gt;&gt;
  • 除了标题不匹配之外,您为什么坚持通过执行 两个这些转换来使代码复杂化?

标签: java java-8 completable-future


【解决方案1】:

如果我没有误解您的意图,您只想使结果变平。您可以使用Spliterator 懒惰地接收结果并使用flatMapStreams 合并到一个扁平流中,例如:

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
               scrappingFutures.flatMap(each -> unwrap(each));

static <T> Stream<? extends CompletableFuture<T>> 
  unwrap(CompletableFuture<? extends List<? extends T>> master) {

    return generate(new Predicate<Consumer<? super CompletableFuture<T>>>() {

        private Iterator<? extends T> cursor;

        @Override
        public boolean test(Consumer<? super CompletableFuture<T>> consumer) {
            cursor = cursor == null ? await().iterator() : cursor;
            if (cursor.hasNext()) {
                consumer.accept(completedFuture(cursor.next()));
                return true;
            }
            return false;
        }

        //                        v--- blocked at the first time
        private List<? extends T> await() {
            try {
                return master.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new CompletionException(e);
            }
        }
    });
}

static <T> Stream<? extends CompletableFuture<T>> 
  generate(Predicate<Consumer<? super CompletableFuture<T>>> generator) {

    long unknownSize = Long.MAX_VALUE;
    return stream(new AbstractSpliterator<CompletableFuture<T>>(unknownSize, 0) {
        public boolean tryAdvance(Consumer<? super CompletableFuture<T>> action) {
            return generator.test(action);
        }
    }, false);
}

总结

上面的解决方案只是我的第一个想法,在这种情况下它不是最好的方法,你可以用big design first 来反对它。然而,即使它是一个糟糕的解决方案,但我会保留它,因为它可能会让人们以其他方式思考。更多详情,您可以从这里查看@Holger 的cmets 和there

我承认最好的做法是@Holger下面所说的,因为没有人写下来,请让我记录下来以服务更多人。

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
               scrappingFutures.flatMap(each -> each.join().stream())
                               .map(CompletableFuture::completedFuture);

【讨论】:

  • 我不明白这种分离成unwrapgenerate这两种方法的目的。您在test 中编写的内容可以直接放入tryAdvance 方法中。不过不管怎样,不比一个简单的.flatMap(each -&gt; each.join().stream()) .map(CompletableFuture::completedFuture)更懒惰……
  • @Holger 是的,先生。我同意,在回答之后我发现它可以简化为你的。但我想第一次展示Spliterator的用法,也许是开销,:)
  • @Holger 我使用Predicate 将迭代逻辑从generate 中分离出来。我们可以考虑将Predicate 调整为Spliterator。先生,我说得对吗?
  • 这里没有真正的分离,因为 99% 的逻辑都在 unwrap 方法内,而且由于 Predicate 处理 Iterator,它无论如何都承载着迭代逻辑。
  • @Holger 感谢您的审核。那么答案是反模式吗?你能给我一些关键词或建议吗?我对此很感兴趣。
【解决方案2】:

一般情况下,您无法执行这种倒车操作。在函数类别语言中,您有两个介于以下类别之间的函子:

T --> Stream<T>
Stream<T> --> CompletableFuture<Stream<T>>

你之间根本没有函子

T --> CompletableFuture<T>
CompletableFuture<T> --> Stream<CompletableFuture<T>>

如果您不想关心外部CompletableFuture 并需要Stream 语义,则需要某种StreamT monad transfomer。 Java 目前没有更高种类的类型支持,所以我不确定(但可能会应用一些非标准技巧)是否可以实现类似 StreamT

【讨论】:

  • 不确定我是否同意第一个。 T--> CompletableFuture 如下 final CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() ....
  • @user889742 不太明白你的意思。当然我们可以使用,根据 monad 定义我们需要有pure(t: T): M[T],但是仅仅从T --&gt; CompletableFuture&lt;Stream&lt;T&gt;&gt; 我们不能构造T --&gt; Stream&lt;CompletableFuture&lt;T&gt;&gt;。如果你有一些集合A --&gt; B --&gt; C之间的映射,你能构造A --&gt; C --&gt; B吗?对我来说毫无意义。