【问题标题】:How do I lazily concatenate streams?我如何懒惰地连接流?
【发布时间】:2015-07-06 19:26:05
【问题描述】:

我正在尝试实现一个在其实现中使用另一个自身实例的流。该流有一些附加的常量元素(带有 IntStream.concat),因此只要连接的流懒惰地创建非常量部分,这应该可以工作。我认为将StreamSupport.intStream overload taking a Supplier 与 IntStream.concat ("creates a lazily concatenated stream")一起使用应该足够懒惰,只能在需要元素时创建第二个拆分器,但即使创建流(不评估它)也会溢出堆栈。如何延迟连接流?


我正在尝试将流式素数筛从this answer 移植到Java。这个筛子使用了它自己的另一个实例(Python 代码中的ps = postponed_sieve())。如果我将最初的四个常量元素 (yield 2; yield 3; yield 5; yield 7;) 分解为它们自己的流,则很容易将生成器实现为拆分器:

/**
 * based on https://stackoverflow.com/a/10733621/3614835
 */
static class PrimeSpliterator extends Spliterators.AbstractIntSpliterator {
    private static final int CHARACTERISTICS = Spliterator.DISTINCT | Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED | Spliterator.SORTED;
    private final Map<Integer, Supplier<IntStream>> sieve = new HashMap<>();
    private final PrimitiveIterator.OfInt postponedSieve = primes().iterator();
    private int p, q, c = 9;
    private Supplier<IntStream> s;
    PrimeSpliterator() {
        super(105097564 /* according to Wolfram Alpha */ - 4 /* in prefix */,
                CHARACTERISTICS);
        //p = next(ps) and next(ps) (that's Pythonic?)
        postponedSieve.nextInt();
        this.p = postponedSieve.nextInt();
        this.q = p*p;
    }

    @Override
    public boolean tryAdvance(IntConsumer action) {
        for (; c > 0 /* overflow */; c += 2) {
            Supplier<IntStream> maybeS = sieve.remove(c);
            if (maybeS != null)
                s = maybeS;
            else if (c < q) {
                action.accept(c);
                return true; //continue
            } else {
                s = () -> IntStream.iterate(q+2*p, x -> x + 2*p);
                p = postponedSieve.nextInt();
                q = p*p;
            }
            int m = s.get().filter(x -> !sieve.containsKey(x)).findFirst().getAsInt();
            sieve.put(m, s);
        }
        return false;
    }
}

我对 primes() 方法的第一次尝试返回一个 IntStream,它将一个常量流与一个新的 PrimeSpliterator 连接起来:

public static IntStream primes() {
    return IntStream.concat(IntStream.of(2, 3, 5, 7),
            StreamSupport.intStream(new PrimeSpliterator()));
}

调用 primes() 会导致 StackOverflowError,因为 primes() 总是实例化 PrimeSpliterator,但 PrimeSpliterator 的字段初始化程序总是调用 primes()。但是,StreamSupport.intStream 的重载需要一个供应商,这应该允许懒惰地创建 PrimeSpliterator:

public static IntStream primes() {
    return IntStream.concat(IntStream.of(2, 3, 5, 7),
            StreamSupport.intStream(PrimeSpliterator::new, PrimeSpliterator.CHARACTERISTICS, false));
}

但是,我反而得到了一个 StackOverflowError 与不同的回溯(修剪,因为它重复)。请注意,递归完全在对 primes() 的调用中——终端操作 iterator() 永远不会在返回的流上调用。

Exception in thread "main" java.lang.StackOverflowError
    at java.util.stream.StreamSpliterators$DelegatingSpliterator$OfInt.<init>(StreamSpliterators.java:582)
    at java.util.stream.IntPipeline.lazySpliterator(IntPipeline.java:155)
    at java.util.stream.IntPipeline$Head.lazySpliterator(IntPipeline.java:514)
    at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:352)
    at java.util.stream.IntPipeline.spliterator(IntPipeline.java:181)
    at java.util.stream.IntStream.concat(IntStream.java:851)
    at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
    at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
    at com.jeffreybosboom.projecteuler.util.Primes$$Lambda$1/834600351.get(Unknown Source)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
    at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
    at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
    at java.util.stream.IntStream.concat(IntStream.java:851)
    at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
    at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
    at com.jeffreybosboom.projecteuler.util.Primes$$Lambda$1/834600351.get(Unknown Source)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
    at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
    at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
    at java.util.stream.IntStream.concat(IntStream.java:851)
    at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)

我怎样才能足够延迟地连接流以允许流在其实现中使用其自身的另一个副本?

【问题讨论】:

  • @the8472 它在构造函数中被推进了两次,所以我看不出它是如何被延迟初始化的。 (我认为这个问题仍然有效,因为 IntStream.concat 被证明是懒惰的。)
  • 它与惰性流无关,但x -&gt; x + 2*p 可能是一个错误,因为p 是一个成员变量,在计算 lambda 之前可能会发生变化。

标签: java java-8 java-stream


【解决方案1】:

您显然假设 Streams API 将其惰性保证扩展到拆分器的实例化;这是不正确的。它希望能够在实际消费开始之前的任何时间实例化流的拆分器,例如只是为了找出流的特征和报告的大小。消费只能从调用trySplittryAdvanceforEachRemaining 开始。

考虑到这一点,您将在需要之前初始化推迟的筛子。在tryAdvance 中的else if 部分之前,您无法使用它的任何结果。所以将代码移到最后可能的时刻,这会给出正确性:

@Override
public boolean tryAdvance(IntConsumer action) {
    for (; c > 0 /* overflow */; c += 2) {
        Supplier<IntStream> maybeS = sieve.remove(c);
        if (maybeS != null)
            s = maybeS;
        else {
            if (postponedSieve == null) {
              postponedSieve = primes().iterator();
              postponedSieve.nextInt();
              this.p = postponedSieve.nextInt();
              this.q = p*p;
            }
            if (c < q) {
              action.accept(c);
              return true; //continue

我认为,通过此更改,即使您第一次尝试 primes() 也应该有效。

如果您想保持当前的方法,您可以使用以下成语:

Stream.<Supplier<IntStream>>of(
  ()->IntStream.of(2, 3, 5, 7),
  ()->intStream(new PrimeSpliterator()))
.flatMap(Supplier::get);

你可能会发现这给了你尽可能多的懒惰。

【讨论】:

  • 它没有省略它,它把它放在不同的地方。
  • 不,这是我发布的第一个也是唯一一个代码版本。
  • 你有没有考虑过使用成语Stream.of(stream1, stream2).flatMap(identity()),它在语义上等同于concat,但可能更懒?
  • @TagirValeev 对流的供应商流进行平面映射应该可以解决这个问题(如我编辑的示例中所示)。传播报告的大小更加困难,因为这是一个先有鸡还是先有蛋的问题:在实例化拆分器之前,您无法找出报告的大小。
  • @Marko Topolnik:好吧,每个试图找到素数但不使用简单循环的人,BitSet 用于ints 或BigInteger.nextProbablePrime 用于更大的值,但Stream相反,API 似乎想在函数式编程中进行练习,而不是寻找简单实用的解决方案。在这方面,Stream API 通过将这些障碍准确地扔进程序员所寻求的方式中,做的一切都是正确的。
猜你喜欢
  • 2013-05-24
  • 2015-08-24
  • 1970-01-01
  • 2012-11-04
  • 1970-01-01
  • 1970-01-01
  • 2017-05-25
  • 2011-12-31
  • 1970-01-01
相关资源
最近更新 更多