【问题标题】:Unexpected closing of a Stream while concatenating them连接它们时意外关闭流
【发布时间】:2019-12-12 19:08:58
【问题描述】:

使用 Java 8(如果这很重要),我有一个难以理解的行为。

假设我有一个Entry 类:

static class Entry {
    String key;
    List<String> values;
    public Entry(String key, String... values) {
        this.key = key;
        this.values = Arrays.asList(values);
    }
}

还有一个实例列表:

List<Entry> entries = Arrays.asList(
    new Entry("a", "a1"),
    new Entry("b", "b1"),
    new Entry("a", "a2"));
);

现在我想收集所有具有相同键的条目(并保持不同的值),我偶然发现了“IllegalStateException:流已被操作或关闭”。

生成它的最少代码是:

entries.stream().collect(
  Collectors.groupingBy(
    e -> e.key,
    Collectors.mapping(
        e -> e.values.stream(), 
        Collectors.reducing(Stream.<String>empty(), Stream::concat))
  )
);

(我会添加一个collectingAndThen 来满足我的要求,但这不是我的问题的重点)

我看不到代码的哪一部分消耗/作用于流。此外,如果我将代码更改为以下内容,它可以工作:

entries.stream().collect(
  Collectors.groupingBy(
    e -> e.key,
    Collectors.mapping(
        e -> e.values.stream(), 
        Collectors.reducing(Stream::concat))
  )
);

我宁愿使用以前的代码,因为后者给我一个Map&lt;K, Optional&lt;V&gt;&gt;,而前者给我一个Map&lt;K, V&gt;

但问题是:中性元素的使用在减少方面有什么不同,最终导致(至少)其中一个流被消耗?

【问题讨论】:

  • 中性元素是指identity 元素吗?它确保您肯定会映射一个T value,并且它不能在Optional&lt;T&gt; 中尽可能不存在。根据要求我想收集所有具有相同键(并保持不同值)的条目,您不是在寻找Map&lt;String, Set&lt;String&gt;&gt; map 作为输出吗?
  • Stream::concat 使用传入的流。由于您指定了相同的 Stream.empty() 作为标识元素,因此这是对 Reduction 的错误使用。事实上,即使没有标识元素,对传入流的隐含消费也是对输入的修改,因此违反了合同。只要没有流实例在流中多次出现,您就可以逃脱。
  • 主要问题是你不能有一个流作为标识元素,因为流不能被重用,所以当它试图重用它时,抛出说它被操作或关闭。
  • @Naman 是的,Map&lt;String, Set&lt;String&gt;&gt; 是目标(我知道实现它的几种不同方法),但我在此过程中偶然发现了这个问题。是的,我所说的中立,是指空流。我理解它允许在结果中删除 Optional 的方式和原因,我没想到它会像以前那样失败,因此提出了问题。
  • @GPI 那么你学到了一些非常重要的东西。合同要求id op x = x 始终有效,无论您假设它何时可能被评估。例如,当使用并行流时,它也可能被多次使用,即使在没有分组的情况下使用归约。在这里使用flatMapping 是首选,但更好的是在此位置执行您打算对结果流执行的任何操作。

标签: java java-stream


【解决方案1】:

主要问题可以归结为这个类似的例子:

Stream<String> identity = Stream.empty();
Stream<String> stream1 = Stream.of("1");
Stream<String> stream2 = Stream.of("2");
Stream.concat(identity, stream1); //works
Stream.concat(identity, stream2); //java.lang.IllegalStateException

换句话说,

Collectors.reducing(Stream.<String>empty(), Stream::concat)

使用Stream.&lt;String&gt;empty() 创建一个 流对象,并将其作为多级归约中的标识值重用。幸运的是,您已经有了解决方法。


正如文档中所警告的那样,并且在 cmets 中也指出,不鼓励重复的流连接:

从重复连接构造流时要小心。访问深度串联流的元素可能会导致深度调用链,甚至是 StackOverflowException。

我能想到的另一种方法是在分组之前展平流:

//This yields a Map<String, List<String>>
entries.stream()
    .flatMap(v -> v.values.stream().map(val -> new SimpleEntry<>(v.key, val)))
    .collect(Collectors.groupingBy(
        Map.Entry::getKey, 
        Collectors.mapping(Map.Entry::getValue, 
                           Collectors.toList())));

【讨论】:

  • Not a recommended 解决方法:“从重复连接构造流时要小心。访问深度串联流的元素可能会导致深度调用链,甚至 StackOverflowException [原文如此]。
  • 您对问题类型的减少缺乏一点上下文 - 起初。当然,您展示的内容失败了,但我会猜到的。在问之前我没有猜到是“我在我的问题中构建的任何流在哪里/为什么被重复使用”。如果您对此有所扩展,我很乐意将您的答案标记为已接受。
  • @GPI - 我可能过于简单化了,但你所描述的正是我正在提出的观点(或至少试图这样做)。您在多级缩减中有一个 Stream 身份对象。我想这回答了你的问题(它问 我看不到代码的哪一部分消耗/作用于流)。关于接受,我相信还有时间给出更好的答案,但也欢迎您编辑此答案!
  • @ernest_k 我已经以一种对我来说很清楚的方式进行了编辑。如果您觉得它与您的意思不同,请随时恢复。
  • @GPI 您的版本完美地说明了同样的问题。谢谢!
【解决方案2】:

主要问题是你不能将流作为标识元素,因为流不能被重用,所以当它试图重用它时,抛出说它被操作或关闭。

这是一种替代方法(返回 List 而不是 Optional):

Map<String, List<String>> collect = entries.stream().collect(
    Collectors.groupingBy(
        e -> e.key,
        Collectors.flatMapping(e -> e.values.stream(), Collectors.toList())))

【讨论】:

  • flatMapping 很好,但我是 Java 8(我没有明确标记 Java 8,所以不用担心)。有许多解决方案可供选择,例如创建一个简单的Collector.of(HashSet::new, (acc, value) -&gt; acc.addAll(value), (acc1, acc2) -&gt; acc1.addAll(acc2))̀,使用CollectingAndThen(toList(), lists -&gt; lists.stream.distinct.collect)...问题的症结在于Stream.empty() 不再像Stream.of(...) 那样可重用,正如您所提到的。
猜你喜欢
  • 1970-01-01
  • 2018-04-29
  • 2020-12-23
  • 2013-05-31
  • 1970-01-01
相关资源
最近更新 更多