【问题标题】:Can a Collector's combiner function ever be used on sequential streams?收集器的组合器功能可以用于顺序流吗?
【发布时间】:2015-05-26 10:31:45
【问题描述】:

示例程序:

public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {
        final Collector<Integer, ?, List<Integer>> c
            = Collector.of(ArrayList::new, List::add, nope());

        IntStream.range(0, 10_000_000).boxed().collect(c);
    }
}

所以,为了简单起见,这里没有最终的转换,所以生成的代码很简单。

现在,IntStream.range() 生成一个顺序流。我只需将结果放入Integers 中,然后我设计的Collector 将它们收集到List&lt;Integer&gt; 中。很简单。

无论我运行这个示例程序多少次,UnsupportedOperationException 都不会命中,这意味着我的虚拟组合器永远不会被调用。

我有点预料到这一点,但我已经误解了流,以至于我不得不问这个问题......

当流保证是连续的时,是否可以调用Collector 的组合器?

【问题讨论】:

  • 我敢打赌,即使没有必要,也无法保证这一点。
  • @MarkoTopolnik 或者我(再次)误读了 javadoc,它实际上是明确的,一种或另一种方式......
  • "combiner 然而总是抛出" > 我不明白你这句话的这一部分 - 总是抛出什么?
  • 文档中最相关的部分似乎是“使用收集器的缩减的顺序实现将使用供应商函数创建单个结果容器,并调用累加器函数一次每个输入元素。并行实现将对输入进行分区,为每个分区创建一个结果容器,将每个分区的内容累积到该分区的子结果中,然后使用组合器函数将子结果合并为组合结果。 i>”,这是相当宽松的措辞。
  • Here,Stuart Marks 描述了 combinerreduce 案例中的作用,他没有提到顺序合并操作的任何可能性。

标签: java java-8 java-stream


【解决方案1】:

仔细阅读ReduceOps.java 中的流实现代码会发现,仅当ReduceTask 完成时才调用组合函数,而ReduceTask 实例仅在并行评估管道时使用。因此,在当前的实现中,在评估顺序管道时永远不会调用组合器。

但是,规范中没有任何内容可以保证这一点。 Collector 是一个对其实现提出要求的接口,并且没有为顺序流授予豁免。就个人而言,我很难想象为什么顺序管道评估可能需要调用组合器,但是比我更有想象力的人可能会找到一个聪明的用途,并实现它。规范允许这样做,即使今天的实现没有这样做,您仍然需要考虑它。

这应该不足为奇。流 API 的设计中心是支持在与顺序执行平等的基础上并行执行。当然,程序可以观察它是按顺序执行还是并行执行。但 API 的设计是为了支持一种允许两者之一的编程风格。

如果您正在编写收集器,并且发现编写关联组合器函数是不可能的(或不方便或困难的),导致您希望将流限制为顺序执行,这可能意味着您正在错的方向。是时候退后一步,考虑以不同的方式解决问题了。

一种不需要关联组合函数的常见归约式操作称为fold-left。主要特点是 fold 函数严格从左到右应用,一次一个。我不知道并行化左折叠的方法。

当人们试图以我们一直在谈论的方式扭曲收藏家时,他们通常会寻找像左折叠这样的东西。 Streams API 没有直接的 API 支持这个操作,但它很容易编写。例如,假设您想使用此操作减少字符串列表:重复第一个字符串,然后追加第二个。很容易证明这个操作不是关联的:

List<String> list = Arrays.asList("a", "b", "c", "d", "e");

System.out.println(list.stream()
    .collect(StringBuilder::new,
             (a, b) -> a.append(a.toString()).append(b),
             (a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE

按顺序运行,这会产生所需的输出:

aabaabcaabaabcdaabaabcaabaabcde

但是当并行运行时,它可能会产生这样的结果:

aabaabccdde

由于它是按顺序“工作”的,我们可以通过调用 sequential() 来强制执行此操作,并通过让组合器抛出异常来支持此操作。此外,必须只调用一次供应商。中间结果没有办法合并,所以如果供应商被调用两次,我们已经有麻烦了。但是由于我们“知道”供应商在顺序模式下只被调用一次,所以大多数人并不担心这一点。事实上,我见过有人编写“供应商”返回一些现有对象而不是创建一个新对象,这违反了供应商合同。

collect() 的这种 3 参数形式的使用中,我们有三个函数中有两个违反了它们的合同。这不应该告诉我们以不同的方式做事吗?

这里的主要工作是由累加器函数完成的。为了实现折叠式缩减,我们可以使用forEachOrdered() 以严格的从左到右的顺序应用此函数。我们必须在前后做一些设置和整理代码,但这没问题:

StringBuilder a = new StringBuilder();
list.parallelStream()
    .forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());

当然,这可以并行运行,尽管并行运行的性能优势可能会被forEachOrdered() 的排序要求所抵消。

总而言之,如果您发现自己想要进行可变归约,但缺少关联组合器函数,导致您将流限制为顺序执行,请将问题重铸为 fold-left 在你的累加器函数上操作和使用forEachRemaining()

【讨论】:

  • 我完全了解 API,我只是好奇 ;) 像往常一样出色的答案。我也学到了一些东西......
【解决方案2】:

正如@MarkoTopolnik 和@Duncan 之前的cmets 中所观察到的,不能保证在顺序模式下调用Collector.combiner() 会产生减少的结果。事实上,Java doc 在这一点上有点主观,可能会导致不恰当的解释。

(...) 并行实现将对输入进行分区,为每个分区创建一个结果容器,将每个分区的内容累积到该分区的子结果中,然后 然后使用组合器函数合并子结果合并成一个结果

根据NoBlogDefFound 组合器仅用于并行模式。请参阅下面的部分引用:

combiner() 用于将两个累加器合并为一个。当collector并行执行时使用,首先拆分输入Stream并独立收集部分。

为了更清楚地说明这个问题,我重新编写了第一个代码,并提出了两种方法(串行和并行)。


public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {

        final Collector<Integer, ?, List<Integer>> c =
                Collector
                    .of(ArrayList::new, List::add, nope());

        // approach sequential
        Stream<Integer> sequential = IntStream
                .range(0, 10_000_000)
                .boxed();

        System.out.println("isParallel:" + sequential.isParallel());
        sequential
                .collect(c);

        // approach parallel
        Stream<Integer> parallel = IntStream
                .range(0, 10_000_000)
                .parallel()
                .boxed();

        System.out.println("isParallel:" + parallel.isParallel());
        parallel
                .collect(c);
    }
}

运行这段代码后我们可以得到输出:

isParallel:false
isParallel:true
Exception in thread "main" java.lang.UnsupportedOperationException: nope
    at com.stackoverflow.lambda.CollectorTest.lambda$nope$0(CollectorTest.java:18)
    at com.stackoverflow.lambda.CollectorTest$$Lambda$3/2001049719.apply(Unknown Source)
    at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:174)
    at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:160)

所以,根据这个结果我们可以推断Collector's combiner只能被并行执行调用。

【讨论】:

  • 不过这只是Stream 的一个例子,更重要的是它由JDK 提供...
  • @fge 我在想你的主要问题只是关于组合器和流。你对我应该提到的其他观点有什么想法吗?我的意图是展示与您评论相同的用法,但添加了并行行为,在这种情况下会引发异常。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-07-11
  • 1970-01-01
  • 2017-04-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多