【问题标题】:Why can't stream of streams be reduced un parallel ? / stream has already been operated upon or closed为什么不能并行减少流流? / 流已经被操作或关闭
【发布时间】:2014-08-20 18:56:04
【问题描述】:

上下文

我偶然发现了一个相当烦人的问题:我有一个程序有很多数据源,这些数据源能够流式传输相同类型的元素,我想“映射”程序中的每个可用元素(元素顺序没关系)。

因此,我尝试使用streamOfT = streamOfStreamOfT.reduce(Stream.empty(), Stream::concat);Stream<Stream<T>> streamOfStreamOfT; 简化为简单的Stream<T> streamOfT;

由于元素顺序对我来说并不重要,我尝试使用.parallel() 并行化reduce 操作:streamOfT = streamOfStreamOfT.parallel().reduce(Stream.empty(), Stream::concat); 但这会触发java.lang.IllegalStateException: stream has already been operated upon or closed

示例

要亲自体验,只需通过评论/取消评论 .parallel() 来玩以下 main (java 1.8u20)

public static void main(String[] args) {
    // GIVEN
    List<Stream<Integer>> listOfStreamOfInts = new ArrayList<>();
    for (int j = 0; j < 10; j++) {
        IntStream intStreamOf10Ints = IntStream.iterate(0, i -> i + 1)
                .limit(10);
        Stream<Integer> genericStreamOf10Ints = StreamSupport.stream(
                intStreamOf10Ints.spliterator(), true);
        listOfStreamOfInts.add(genericStreamOf10Ints);
    }
    Stream<Stream<Integer>> streamOfStreamOfInts = listOfStreamOfInts
            .stream();
    // WHEN
    Stream<Integer> streamOfInts = streamOfStreamOfInts
            // ////////////////
            // PROBLEM
            //    |
            //    V
            .parallel()
            .reduce(Stream.empty(), Stream::concat);

    // THEN
    System.out.println(streamOfInts.map(String::valueOf).collect(
            joining(", ")));
}

问题

有人能解释一下这个限制吗? / 找到一种更好的方法来处理流的并行减少


编辑 1

在@Smutje 和@LouisWasserman cmets 之后,.flatMap(Function.identity()) 似乎是容忍.parallel() 流的更好选择

【问题讨论】:

  • 有一个列表列表,你可以flatMap它使用流的一个大列表...
  • 你绝对应该在这里使用flatMap
  • @Smutje 好的,flatMap 完成了这项工作,但我仍然不明白 reduce 对并行化的“反应”
  • 关于“流顺序对我来说不重要”的部分无关紧要;无论如何,并行归约都会保留顺序(只要您的归约运算符是关联的。)

标签: java parallel-processing java-8 java-stream


【解决方案1】:

您正在使用的reduce 的形式采用标识 和关联组合函数。但是Stream.empty() 不是一个值;它有状态。流不是数组或集合之类的数据结构;它们是通过可能的并行聚合操作推送数据的载体,并且它们具有某种状态(例如流是否已被消费)。想想这是如何工作的;您将构建一棵树,其中相同的“空”流出现在多个叶子中。当您尝试使用此有状态的非身份两次(不会顺序发生,而是并行发生)时,第二次尝试遍历该空流时,将完全正确地看到它已被使用.

所以问题是,您只是错误地使用了这个reduce 方法。问题不在于并行性。只是并行性暴露了潜在的问题。

其次,即使这以您认为应该的方式“工作”,您也只能获得并行性来构建表示扁平流的树;当您进行连接时,那是那里的顺序流管道。哎呀。

第三,即使这按您认为应该的方式“工作”,您仍将通过构建串联流来增加大量元素访问开销,并且您不会获得并行性的好处你在寻找。

简单的答案是扁平化流:

String joined = streamOfStreams.parallel()
                               .flatMap(s -> s)
                               .collect(joining(", "));

【讨论】:

  • .flatMap(s -&gt; s)可以表示为.flatMap(Function.identity())
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多