【问题标题】:Java Streams - grouping items on sorted streams efficientlyJava Streams - 有效地对已排序流上的项目进行分组
【发布时间】:2015-12-04 04:42:29
【问题描述】:

我正在寻找一种方法来实现非终端分组操作,这样内存开销就会最小。

例如,考虑 distinct()。在一般情况下,它别无选择,只能收集所有不同的项目,然后才将它们向前传输。但是,如果我们知道输入流已经排序,则操作可以“即时”完成,使用最少的内存。

我知道我可以使用迭代器包装器为迭代器实现这一点,并自己实现分组逻辑。有没有更简单的方法来使用流 API 来实现这一点?

--编辑--

我找到了一种滥用 Stream.flatMap(..) 来实现此目的的方法:

  private static class DedupSeq implements IntFunction<IntStream> {
    private Integer prev;

    @Override
    public IntStream apply(int value) {
      IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value);
      prev = value;
      return res;
    }    
  }

然后:

IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println);

哪些打印:

1
3
4
5

经过一些更改,相同的技术可用于任何类型的内存高效序列分组流。无论如何,我不太喜欢这个解决方案,我一直在寻找更自然的东西(例如映射或过滤的工作方式)。此外,我在这里违反合同,因为提供给 flatMap(..) 的函数是有状态的。

【问题讨论】:

  • 您总是可以.filter(someSet::add),但是您是否尝试过将这种解决方案的性能与普通的distinct() 进行比较?另外,您说“在一般情况下”,但如果Stream ORDERED,确切地说(或更准确地说,它的底层@ 987654328@)
  • @fge:我不确定那里有任何优化。代码: IntStream.range(0, 100000000).distinct().forEach(x->{});尽管底层 Spliterator 报告自己是 ORDERED,但内存不足。
  • 你试过.forEachOrdered()吗?
  • DISTINCTSORTED 都有。但是 - 查看 jdk8 代码 - IntStream 实现没有使用.distinct() 中的任何一个。似乎是基于引用的流。
  • @the8472:我猜重要的特征是SORTED,我的例子中生成的流有它。

标签: java java-8 java-stream


【解决方案1】:

如果您想要一个不向不应该拥有可变状态的函数添加可变状态的解决方案,您可以求助于collect

static void distinctForSorted(IntStream s, IntConsumer action) {
    s.collect(()->new long[]{Long.MIN_VALUE},
              (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }},
              (a, b)->{ throw new UnsupportedOperationException(); });
}

这是有效的,因为它是使用可变容器的预期方式,但是,它不能并行工作,因为在任意流位置拆分意味着可能在两个(甚至更多)线程中遇到一个值。

如果您想要一个通用的IntStream 而不是forEach 操作,则首选Spliterator 低级别解决方案,尽管会增加复杂性。

static IntStream distinctForSorted(IntStream s) {
    Spliterator.OfInt sp=s.spliterator();
    return StreamSupport.intStream(
      new Spliterators.AbstractIntSpliterator(sp.estimateSize(),
      Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) {
        long last=Long.MIN_VALUE;
        @Override
        public boolean tryAdvance(IntConsumer action) {
            long prev=last;
            do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last);
            return true;
        }
        @Override
        public void forEachRemaining(IntConsumer action) {
            sp.forEachRemaining(distinct(action));
        }
        @Override
        public Comparator<? super Integer> getComparator() {
            return null;
        }
        private IntConsumer distinct(IntConsumer c) {
            return i-> {
                if(i==last) return;
                assert i>last;
                last=i;
                c.accept(i);
            };
        }
    }, false);
}

它甚至继承了并行支持,尽管它通过在另一个线程中处理它们之前预取一些值来工作,因此它不会加速 distinct 操作,但如果有计算,可能会进行后续操作激烈的。


为了补全,这里有一个针对任意(即未排序)IntStreams 的独特操作,它不依赖于“boxing plus HashMap”,因此可能具有更好的内存占用:

static IntStream distinct(IntStream s) {
    boolean parallel=s.isParallel();
    s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream();
    if(parallel) s=s.parallel();
    return s;
}

它仅适用于正的int 值;将其扩展到完整的 32 位范围将需要两个 BitSets,因此看起来不够简洁,但通常用例允许将存储限制在 31 位范围甚至更低……

【讨论】:

  • 谢谢。我现在看到自定义 Spliterator 是这样做的方法(就像 Stuart Marks 建议的在 stackoverflow.com/q/283​​63323/1441122 中一样)。顺便说一下,最后的 bitset 解决方案很优雅(尽管内存使用量仍然为 O(n))。
【解决方案2】:

正确执行此操作的方法是将流转换为拆分器,然后根据返回的拆分器的属性对其进行包装

  • 如果源既未排序也未区分,则使用并发集执行简单的重复数据删除
  • 如果源拆分器已排序,则执行优化优化去重。
    支持 trySplit 操作将很棘手,因为它可能必须将子拆分器推进几步,直到它可以确定它没有看到一个尾部运行不明确的元素。
  • 如果源已经不同,则按原样返回拆分器

一旦你有了那个分离器,你就可以把它变成一个具有相同属性的流,并继续对其进行流操作

由于我们无法修改现有的 jdk 接口,因此辅助 API 必须看起来更像这样:dedup(IntStream.of(...).map(...)).collect(...)


如果您检查 java.util.stream.DistinctOps.makeRef(AbstractPipeline&lt;?, T, ?&gt;) 的来源,您会注意到 JDK 或多或少地对基于引用的流执行此操作。

只是 IntStream 实现 (java.util.stream.IntPipeline.distinct()) 采用了没有利用 DISTINCTSORTED 的低效方法。

它只是盲目地将 IntStream 转换为装箱的Integer 流,并使用基于引用的重复数据删除,而不传递适当的标志以提高内存效率。

如果 jdk9 中尚未修复此问题,则可能值得一个 bug,因为如果流操作不必要地丢弃流标志,它本质上是不必要的内存消耗和浪费优化潜力。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-10-05
    • 2017-06-05
    • 2016-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多