【问题标题】:Scala, Erastothenes: Is there a straightforward way to replace a stream with an iteration?Scala,Erastothenes:有没有一种直接的方法可以用迭代替换流?
【发布时间】:2014-01-25 22:57:37
【问题描述】:

我编写了一个使用流无限生成素数的函数(维基百科:incremental sieve of Erastothenes)。它返回一个流,但它也在内部合并素数流以标记即将到来的复合。这个定义简洁、实用、优雅且易于理解,如果我自己这么说的话:

def primes(): Stream[Int] = {
  def merge(a: Stream[Int], b: Stream[Int]): Stream[Int] = {
    def next = a.head min b.head
    Stream.cons(next, merge(if (a.head == next) a.tail else a,
                            if (b.head == next) b.tail else b))
  }
  def test(n: Int, compositeStream: Stream[Int]): Stream[Int] = {
    if (n == compositeStream.head) test(n+1, compositeStream.tail)
    else Stream.cons(n, test(n+1, merge(compositeStream, Stream.from(n*n, n))))
  }
  test(2, Stream.from(4, 2))
}

但是,当我尝试生成第 1000 个素数时,我收到“java.lang.OutOfMemoryError: GC 开销限制超出”。

我有一个替代解决方案,它在素数上返回一个迭代器,并在内部使用元组的优先级队列(多个,用于生成多个的素数)来标记即将到来的复合。它运行良好,但它需要大约两倍的代码,我基本上不得不从头开始:

import scala.collection.mutable.PriorityQueue
def primes(): Iterator[Int] = {
  // Tuple (composite, prime) is used to generate a primes multiples
  object CompositeGeneratorOrdering extends Ordering[(Long, Int)] {
    def compare(a: (Long, Int), b: (Long, Int)) = b._1 compare a._1
  }
  var n = 2;
  val composites = PriorityQueue(((n*n).toLong, n))(CompositeGeneratorOrdering)
  def advance = {
    while (n == composites.head._1) { // n is composite
      while (n == composites.head._1) { // duplicate composites
        val (multiple, prime) = composites.dequeue
        composites.enqueue((multiple + prime, prime))
      }
      n += 1
    }
    assert(n < composites.head._1)
    val prime = n
    n += 1
    composites.enqueue((prime.toLong * prime.toLong, prime))
    prime
  }
  Iterator.continually(advance)
}

有没有一种直接的方法可以将带有流的代码转换为带有迭代器的代码?或者有没有一种简单的方法可以让我的第一次尝试更有效率?

从流的角度考虑更容易;我宁愿这样开始,然后在必要时调整我的代码。

【问题讨论】:

  • 请注意,我的第一个代码早在第 10,000 个素数时就会出现整数溢出问题。

标签: scala stream iterator out-of-memory primes


【解决方案1】:

我猜这是当前Stream 实现中的一个错误。

primes().drop(999).head 工作正常:

primes().drop(999).head
// Int = 7919

您将获得 OutOfMemoryError 和存储的 Stream,如下所示:

val prs = primes()

prs.drop(999).head
// Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

Consimplementation 类的问题:它不仅包含计算出的tail,还包含计算这个tail 的函数。即使 tail 被计算出来并且不再需要函数!

在这种情况下,函数非常繁重,因此即使存储了 1000 个函数,您也会得到 OutOfMemoryError

我们必须以某种方式删除这些功能。

直觉修复失败:

val prs = primes().iterator.toStream

prs.drop(999).head
// Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

iteratorStream 你会得到StreamIterator,在StreamIterator#toStream 你会get 初始重Stream

解决方法

所以我们必须手动转换:

def toNewStream[T](i: Iterator[T]): Stream[T] =
  if (i.hasNext) Stream.cons(i.next, toNewStream(i))
  else Stream.empty

val prs = toNewStream(primes().iterator)
// Stream[Int] = Stream(2, ?)

prs.drop(999).head
// Int = 7919

【讨论】:

  • 谢谢!我喜欢这种解决方法,或者至少,我喜欢它节省了我的实现。但是,我真的不明白。我查看了 Stream 的源代码,但我不明白它是如何出错的。你能详细说明一下吗?
  • @stewSquared:这是fixtlVal = tl tl 应该存储在一个字段中。 private[this] var tlFunc: () =&gt; Stream[A] = tl _ tl 在构造函数中使用。
【解决方案2】:

在您的第一个代码中,您应该推迟合并,直到在候选中看到素数的平方。这将大大减少正在使用的流的数量,从根本上改善您的内存使用问题。为了得到第 1000 个素数,7919,我们只需要考虑不高于其平方根的素数,88。这只是它们倍数的 23 个素数/流,而不是 99922,如果我们从一开始就忽略偶数)。对于第 10,000 个素数,它是具有 9999 个倍数流和只有 66 个流之间的区别。而对于第 100,000 个,只需要 189 个。

诀窍是通过递归调用将消耗的素数与产生的素数分开:

def primes(): Stream[Int] = {
  def merge(a: Stream[Int], b: Stream[Int]): Stream[Int] = {
    def next = a.head min b.head
    Stream.cons(next, merge(if (a.head == next) a.tail else a,
                            if (b.head == next) b.tail else b))
  }
  def test(n: Int, q: Int, 
                   compositeStream: Stream[Int], 
                   primesStream: Stream[Int]): Stream[Int] = {
    if (n == q) test(n+2, primesStream.tail.head*primesStream.tail.head,
                          merge(compositeStream, 
                                Stream.from(q, 2*primesStream.head).tail),
                          primesStream.tail)
    else if (n == compositeStream.head) test(n+2, q, compositeStream.tail,
                                                     primesStream)
    else Stream.cons(n, test(n+2, q, compositeStream, primesStream))
  }
  Stream.cons(2, Stream.cons(3, Stream.cons(5, 
     test(7, 25, Stream.from(9, 6), primes().tail.tail))))
}

作为一个额外的好处,没有必要将素数的平方存储为Longs。这也将更快并且具有更好的算法复杂性(时间和空间),因为它避免了做很多多余的工作。 Ideone testing 表明它运行在大约 ~ n1.5..1.6empirical orders of growth 产生多达 n = 80,000 个素数。

这里还有一个算法问题:这里创建的结构仍然是一个线性左倾结构(((mults_of_2 + mults_of_3) + mults_of_5) + ...),更频繁产生的流位于更深的内部(所以数字有更多的层次可以渗透,去向上)。右倾结构应该更好,mults_of_2 + (mults_of_3 + (mults_of_5 + ...))。把它变成一棵树应该会真正提高时间复杂度(通常将其降低到大约 ~ n1.2..1.25)。相关讨论见this haskellwiki page

Eratosthenes 的“真正”命令式筛子通常运行在 ~ n1.1 左右(在 n 个产生的素数中)和一个最佳试验划分在 ~ n1.40..1.45 筛分。 Your original code runs at 大约立方时间,或更糟。使用命令式可变数组通常是最快的,按段工作(也就是 Eratosthenes 的分段筛)。

在您的第二个代码的上下文中,this is how it is achieved in Python

【讨论】:

  • 是的!你说得对!在这两个方面。这些是可以对我的代码进行的明显优化。
【解决方案3】:

有没有一种直接的方法可以将带有流的代码转换为带有迭代器的代码?或者有没有一种简单的方法可以让我的第一次尝试更有效率?

@Will Ness 使用 Streams 为您提供了一个改进的答案,并给出了为什么您的代码在早期添加流和左倾线性结构时占用如此多内存和时间的原因,但没有人完全回答第二个(或也许是主要的)您的问题的一部分,即真正的 Eratosthenes 增量筛是否可以用迭代器实现。

首先,我们应该正确地相信这个右倾算法,您的第一个代码是一个粗略的(左倾)示例(因为它过早地将所有主要复合流添加到合并操作中),这是由于 Richard Bird在Melissa E. O'Neill's definitive paper on incremental Sieve's of Eratosthenes的后记中。

第二,不,在这个算法中用迭代器代替流是不可能的,因为它依赖于在流中移动而不重新启动流,虽然可以访问迭代器的头部(当前位置),使用下一个值(跳过头部)将迭代的其余部分生成为流需要以可怕的内存和时间成本构建一个全新的迭代器。但是,我们可以使用迭代器来输出素数序列的结果,以最大程度地减少内存使用,并使迭代器高阶函数的使用变得容易,正如您将在下面的代码中看到的那样。

现在 Will Ness 已经向您介绍了将主要复合流添加到计算中的原则,直到需要它们时,当将它们存储在诸如优先级队列或 HashMap 之类的结构中时效果很好,甚至在O'Neill 的论文,但对于 Richard Bird 算法,这不是必需的,因为在需要之前不会访问未来的流值,因此不会存储 如果流被正确地延迟构建(就像延迟和左倾)。实际上,该算法甚至不需要完整 Stream 的记忆和开销,因为每个复合数剔除序列仅向前移动而不参考任何过去的素数,除了一个需要单独的基本素数来源,可以由相同算法的递归调用。

为了方便参考,我们将 Richard Bird 算法的 Haskell 代码列举如下:

primes = 2:([3..] ‘minus‘ composites)
  where
    composites = union [multiples p | p <− primes]
    multiples n = map (n*) [n..]
    (x:xs) ‘minus‘ (y:ys)
      | x < y = x:(xs ‘minus‘ (y:ys))
      | x == y = xs ‘minus‘ ys
      | x > y = (x:xs) ‘minus‘ ys
    union = foldr merge []
      where
        merge (x:xs) ys = x:merge’ xs ys
        merge’ (x:xs) (y:ys)
          | x < y = x:merge’ xs (y:ys)
          | x == y = x:merge’ xs ys
          | x > y = y:merge’ (x:xs) ys

在下面的代码中,我简化了“minus”函数(称为“minusStrtAt”),因为我们不需要构建一个全新的流,但可以将复合减法运算与原始流的生成结合起来(在我的例子中仅赔率)序列。我还简化了“联合”功能(将其重命名为“mrgMltpls”)

流操作被实现为非记忆通用共感应流 (CIS) 作为通用类,其中类的第一个字段是流当前位置的值,第二个字段是 thunk(零参数通过嵌入闭包参数将流的下一个值返回给另一个函数的函数)。

def primes(): Iterator[Long] = {
  // generic class as a Co Inductive Stream element
  class CIS[A](val v: A, val cont: () => CIS[A])

  def mltpls(p: Long): CIS[Long] = {
    var px2 = p * 2
    def nxtmltpl(cmpst: Long): CIS[Long] =
      new CIS(cmpst, () => nxtmltpl(cmpst + px2))
    nxtmltpl(p * p)
  }
  def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
    new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
  def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
    if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
    else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
    else new CIS(b.v, () => merge(a.cont(), b.cont()))
  def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
    new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(mlps.cont())))
  def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
    if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
    else minusStrtAt(n + 2, cmpsts.cont())
  // the following are recursive, where cmpsts uses oddPrms and
  // oddPrms uses a delayed version of cmpsts in order to avoid a race
  // as oddPrms will already have a first value when cmpsts is called to generate the second
  def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
  def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
  Iterator.iterate(new CIS(2L, () => oddPrms()))
                   {(cis: CIS[Long]) => cis.cont()}
    .map {(cis: CIS[Long]) => cis.v}
}

上面的代码在大约 1.3 秒内生成 ideone 上的第 100,000 个素数 (1299709),开销约为 0.36 秒,并且经验计算复杂度为 600,000 个素数,约为 1.43。内存使用量在程序代码使用量之上可以忽略不计。

上面的代码可以使用内置的 Scala Streams 来实现,但是这个算法不需要性能和内存使用开销(常数因子)。使用 Streams 意味着可以直接使用它们而无需额外的 Iterator 生成代码,但由于这仅用于序列的最终输出,因此成本并不高。

要按照 Will Ness 的建议实现一些基本的树折叠,只需添加一个“pairs”函数并将其挂钩到“mrgMltpls”函数中:

def primes(): Iterator[Long] = {
  // generic class as a Co Inductive Stream element
  class CIS[A](val v: A, val cont: () => CIS[A])

  def mltpls(p: Long): CIS[Long] = {
    var px2 = p * 2
    def nxtmltpl(cmpst: Long): CIS[Long] =
      new CIS(cmpst, () => nxtmltpl(cmpst + px2))
    nxtmltpl(p * p)
  }
  def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
    new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
  def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
    if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
    else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
    else new CIS(b.v, () => merge(a.cont(), b.cont()))
  def pairs(mltplss: CIS[CIS[Long]]): CIS[CIS[Long]] = {
    val tl = mltplss.cont()
    new CIS(merge(mltplss.v, tl.v), () => pairs(tl.cont()))
  }
  def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
    new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(pairs(mlps.cont()))))
  def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
    if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
    else minusStrtAt(n + 2, cmpsts.cont())
  // the following are recursive, where cmpsts uses oddPrms and
  // oddPrms uses a delayed version of cmpsts in order to avoid a race
  // as oddPrms will already have a first value when cmpsts is called to generate the second
  def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
  def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
  Iterator.iterate(new CIS(2L, () => oddPrms()))
                   {(cis: CIS[Long]) => cis.cont()}
    .map {(cis: CIS[Long]) => cis.v}
}

上面的代码在大约 0.75 秒内生成 ideone 上的第 100,000 个素数 (1299709),开销约为 0.37 秒,并且对第 1,000,000 个素数 (15485863) 的经验计算复杂度约为 1.09(5.13 秒)。内存使用量在程序代码使用量之上可以忽略不计。

请注意,上面的代码是完全正常的,因为没有使用任何可变状态,但是 Bird 算法(甚至是树折叠版本)不如使用优先级队列或 HashMap 来处理更大的范围处理树合并的操作数具有比优先级队列的 log n 开销或 HashMap 的线性(摊销)性能更高的计算复杂度(尽管处理散列有很大的常数因子开销,因此优势是'直到使用一些真正大的范围才真正看到)。

这些代码使用如此少内存的原因是 CIS 流的制定没有永久引用流的开始,因此流在使用时会被垃圾收集,只留下最少数量的基本素数复合序列占位符,正如 Will Ness 所解释的那样,它非常小 - 只有 546 个基本素数复合数流,用于生成前一百万个素数,直到 15485863,每个占位符只占用几十个字节(8 个用于长数,8 个用于64 位函数引用,另外几个字节用于指向闭包参数的指针,另外几个字节用于函数和类开销,每个流占位符的总数可能为 40 个字节,或者总共不超过 20 KB用于生成一百万个素数的序列)。

【讨论】:

    【解决方案4】:

    如果你只想要无限的素数流,我认为这是最优雅的方式:

    def primes = {
      def sieve(from : Stream[Int]): Stream[Int] = from.head #:: sieve(from.tail.filter(_ % from.head != 0))
      sieve(Stream.from(2))
    }
    

    【讨论】:

    • 请注意,问题中有Stream.from(n*n, n),因此primes不应过滤所有整数。
    • 我只是想提出一种简单的方法来获得无限的质数流,这也是stewSquared的算法最终也是如此。
    • 这确实是一个无限的素数流。但是,它使用的是试除法而不是 Erastothenes 筛法,也就是说,它很慢。 primes.drop(10000).head 在我的实现中需要 40 秒,而在我获得 GC 开销限制之前需要 3 分钟。阅读这篇文章:cs.hmc.edu/~oneill/papers/Sieve-JFP.pdf 另外,我不只是在寻找无限质数流,而是一种在没有 GC 开销限制的情况下使用 Streams 的方法。
    • 更好的版本:val primes: Stream[Int] = 2 #:: Stream.from(3, 2).filter(i =&gt; primes.takeWhile(j =&gt; j * j &lt;= i).forall(k =&gt; i % k &gt; 0))。运行primes.drop(10000).head 只需不到一秒的时间。
    • @JohnLandahl 感谢这个经典的试除法算法的代码! It runs at ~ n^1.45,对于 n = 25k..100k,如预期的那样。 :)
    猜你喜欢
    • 2021-11-13
    • 2010-10-25
    • 1970-01-01
    • 2020-03-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-01
    • 2020-01-01
    相关资源
    最近更新 更多