【问题标题】:Collect results from parallel stream从并行流中收集结果
【发布时间】:2015-06-03 17:53:31
【问题描述】:

我有一段这样的代码:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    ArrayList<Egg> eggs = new ArrayList<>();
    while (hen.hasEgg()) {
        eggs.add(hen.getEgg());
    }
    return eggs;
}).flatMap(Collection::stream).collect(Collectors.toList());

但是通过这种方式,我必须为每只母鸡创建一个 ArrayList,并且在母鸡 100% 处理之前不会收集鸡蛋。我想要这样的东西:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    while (hen.hasEgg()) {
        yield return hen.getEgg();
    }
}).collect(Collectors.toList());

但是Java没有yield return。有没有办法实现它?

【问题讨论】:

  • 您应该添加您的Hen 类定义。另外,您具体要完成什么?您是否希望在继续收集鸡蛋时可以开始处理部分结果(如在部分填充的List&lt;Egg&gt; 中)?或者您只是想收集所有母鸡的所有鸡蛋,而在收集之前必须完成所有母鸡的处理这一事实不是问题?
  • 在这种情况下,我只是收集所有鸡蛋。但是,如果我们最终使用 .processEggs() 而不是 .collect() ,看看我们能做些什么会很好。

标签: java parallel-processing java-8 java-stream


【解决方案1】:

您的Hen 类不适用于Stream API。如果你不能改变它并且它没有其他有用的方法(如Collection&lt;Egg&gt; getAllEggs()Iterator&lt;Egg&gt; eggIterator()),你可以像这样创建一个鸡蛋流:

public static Stream<Egg> eggs(Hen hen) {
    Iterator<Egg> it = new Iterator<Egg>() {
        @Override
        public boolean hasNext() {
            return hen.hasEgg();
        }

        @Override
        public Egg next() {
            return hen.getEgg();
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
}

现在您可以通过以下方式使用它:

List<Egg> eggs = hens.parallelStream()
                     .flatMap(hen -> eggs(hen))
                     .collect(Collectors.toList());

当然,如果您可以更改 Hen 类,则可能会有更好的 Stream 实现。

【讨论】:

  • getEgg() 很复杂,我需要考虑如何更改它。但是,如果我们有一个母鸡的 getAllEggs(),它与我当前的实现不一样,您必须创建一个数组列表来收集母鸡的所有鸡蛋吗?
  • Hen 就像迭代器;可以轻松地从母鸡创建 Iterator。然后用 Spliterator.spliteratorUnknownSize 把它变成一个 Spliterator,用 StreamSupport.把它变成一个 stream(),然后 flatMap 它。
  • @user2316040 类似迭代器的构造(在本例中为 hasEgg/getEgg)的常见问题是检查逻辑和获取逻辑重叠,因此它们可能不得不以一种不舒服的方式共享状态.您可以考虑通过 Spliterators.AbstractSpliterator 创建一个流。这只需要您重写 tryAdvance(),它将检查和返回操作融合到一个方法中。
  • 具有讽刺意味的是,Spliterators 创建了这种方式,试图通过在拆分时将值缓冲到数组中来提供并行支持,因此所有试图提供惰性获取的工作都会在这一点。
  • @Holger 从迭代器创建的拆分器具有相同的并行支持批处理问题。我要解决的是有人可以轻松地提出流。如果您有一个方便的迭代器,那就太好了。如果没有,hasNext() 可能必须做一堆工作并缓存结果,而 next() 必须检查并使缓存无效等。使用 spliterator.tryAdvance() 这一切都可以在一个方法中完成,所以它可能写起来更方便。
【解决方案2】:

使用hasEgg()getEgg() 的迭代逻辑是有状态的,因为这些方法的结果取决于之前的调用。因此,处理单个Hen 无法并行处理,除非您设法完全更改接口。

也就是说,您不必担心ArrayList。当流实现并行执行collect 操作时,它必须缓冲每个线程的值,然后再组合这些缓冲区。甚至可能该操作根本无法从并行执行中受益。

您可以做的是将ArrayList 替换为Stream.Builder,因为它已针对仅在构造Stream 之前添加的用例进行了优化:

List<Egg> eggs = hens.parallelStream().flatMap(hen -> {
    Stream.Builder<Egg> eggStream = Stream.builder();
    while(hen.hasEgg()) {
        eggStream.add(hen.getEgg());
    }
    return eggStream.build();
}).collect(Collectors.toList());

【讨论】:

  • 到目前为止,这给了我最好的性能。但是我接受了另一个,因为那个也适用于最终使用 .processEggs() 而不是 .collect() 的情况。
【解决方案3】:

假设存在getEggs() 方法,您可以使用以下方法收集所有鸡蛋。

List<Egg> eggs = hens.parallelStream()
    .filter(Hen::hasEggs)
    .map(Hen::getEggs)
    .collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);

代码假定getEggs() 返回Collection。如果getEggs()Hen 没有Eggs 时返回一个空的Collection,您可以消除filter(Hen::hasEggs)

【讨论】:

  • 为什么你认为.collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);.collect(Collectors.toList());好?
  • 我做了一些测试,似乎 .collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll) 不如 .collect(Collectors.toList()) 快
  • 问题是你需要收集所有的鸡蛋。如果您只使用Collectors.toList(),您将收集List&lt;Collection&lt;Egg&gt;&gt;。通过使用.collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll),您可以将getEggs() 返回的所有Collection&lt;Egg&gt; 组合成一个List&lt;Egg&gt;
  • @StuartMarks 这个答案是对围绕如果HengetEggs() 方法是否可以收集所有Eggs 的讨论的回应。在没有定义Hen 类的情况下(我曾要求但显然不可用),并且在没有声明Hen 没有getEggs() 方法的情况下,我觉得这不是不合理的假设存在getEggs()
  • @pens-fan-69 当然,这是一个合理的假设,我只是开个小玩笑。
猜你喜欢
  • 1970-01-01
  • 2016-03-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-12-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多