有没有一种直接的方法可以将带有流的代码转换为带有迭代器的代码?或者有没有一种简单的方法可以让我的第一次尝试更有效率?
@Will Ness 使用 Streams 为您提供了一个改进的答案,并给出了为什么您的代码在早期添加流和左倾线性结构时占用如此多内存和时间的原因,但没有人完全回答第二个(或也许是主要的)您的问题的一部分,即真正的 Eratosthenes 增量筛是否可以用迭代器实现。
首先,我们应该正确地相信这个右倾算法,您的第一个代码是一个粗略的(左倾)示例(因为它过早地将所有主要复合流添加到合并操作中),这是由于 Richard Bird在Melissa E. O'Neill's definitive paper on incremental Sieve's of Eratosthenes的后记中。
第二,不,在这个算法中用迭代器代替流是不可能的,因为它依赖于在流中移动而不重新启动流,虽然可以访问迭代器的头部(当前位置),使用下一个值(跳过头部)将迭代的其余部分生成为流需要以可怕的内存和时间成本构建一个全新的迭代器。但是,我们可以使用迭代器来输出素数序列的结果,以最大程度地减少内存使用,并使迭代器高阶函数的使用变得容易,正如您将在下面的代码中看到的那样。
现在 Will Ness 已经向您介绍了将主要复合流添加到计算中的原则,直到需要它们时,当将它们存储在诸如优先级队列或 HashMap 之类的结构中时效果很好,甚至在O'Neill 的论文,但对于 Richard Bird 算法,这不是必需的,因为在需要之前不会访问未来的流值,因此不会存储 如果流被正确地延迟构建(就像延迟和左倾)。实际上,该算法甚至不需要完整 Stream 的记忆和开销,因为每个复合数剔除序列仅向前移动而不参考任何过去的素数,除了一个需要单独的基本素数来源,可以由相同算法的递归调用。
为了方便参考,我们将 Richard Bird 算法的 Haskell 代码列举如下:
primes = 2:([3..] ‘minus‘ composites)
where
composites = union [multiples p | p <− primes]
multiples n = map (n*) [n..]
(x:xs) ‘minus‘ (y:ys)
| x < y = x:(xs ‘minus‘ (y:ys))
| x == y = xs ‘minus‘ ys
| x > y = (x:xs) ‘minus‘ ys
union = foldr merge []
where
merge (x:xs) ys = x:merge’ xs ys
merge’ (x:xs) (y:ys)
| x < y = x:merge’ xs (y:ys)
| x == y = x:merge’ xs ys
| x > y = y:merge’ (x:xs) ys
在下面的代码中,我简化了“minus”函数(称为“minusStrtAt”),因为我们不需要构建一个全新的流,但可以将复合减法运算与原始流的生成结合起来(在我的例子中仅赔率)序列。我还简化了“联合”功能(将其重命名为“mrgMltpls”)
流操作被实现为非记忆通用共感应流 (CIS) 作为通用类,其中类的第一个字段是流当前位置的值,第二个字段是 thunk(零参数通过嵌入闭包参数将流的下一个值返回给另一个函数的函数)。
def primes(): Iterator[Long] = {
// generic class as a Co Inductive Stream element
class CIS[A](val v: A, val cont: () => CIS[A])
def mltpls(p: Long): CIS[Long] = {
var px2 = p * 2
def nxtmltpl(cmpst: Long): CIS[Long] =
new CIS(cmpst, () => nxtmltpl(cmpst + px2))
nxtmltpl(p * p)
}
def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
else new CIS(b.v, () => merge(a.cont(), b.cont()))
def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(mlps.cont())))
def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
else minusStrtAt(n + 2, cmpsts.cont())
// the following are recursive, where cmpsts uses oddPrms and
// oddPrms uses a delayed version of cmpsts in order to avoid a race
// as oddPrms will already have a first value when cmpsts is called to generate the second
def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
Iterator.iterate(new CIS(2L, () => oddPrms()))
{(cis: CIS[Long]) => cis.cont()}
.map {(cis: CIS[Long]) => cis.v}
}
上面的代码在大约 1.3 秒内生成 ideone 上的第 100,000 个素数 (1299709),开销约为 0.36 秒,并且经验计算复杂度为 600,000 个素数,约为 1.43。内存使用量在程序代码使用量之上可以忽略不计。
上面的代码可以使用内置的 Scala Streams 来实现,但是这个算法不需要性能和内存使用开销(常数因子)。使用 Streams 意味着可以直接使用它们而无需额外的 Iterator 生成代码,但由于这仅用于序列的最终输出,因此成本并不高。
要按照 Will Ness 的建议实现一些基本的树折叠,只需添加一个“pairs”函数并将其挂钩到“mrgMltpls”函数中:
def primes(): Iterator[Long] = {
// generic class as a Co Inductive Stream element
class CIS[A](val v: A, val cont: () => CIS[A])
def mltpls(p: Long): CIS[Long] = {
var px2 = p * 2
def nxtmltpl(cmpst: Long): CIS[Long] =
new CIS(cmpst, () => nxtmltpl(cmpst + px2))
nxtmltpl(p * p)
}
def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
else new CIS(b.v, () => merge(a.cont(), b.cont()))
def pairs(mltplss: CIS[CIS[Long]]): CIS[CIS[Long]] = {
val tl = mltplss.cont()
new CIS(merge(mltplss.v, tl.v), () => pairs(tl.cont()))
}
def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(pairs(mlps.cont()))))
def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
else minusStrtAt(n + 2, cmpsts.cont())
// the following are recursive, where cmpsts uses oddPrms and
// oddPrms uses a delayed version of cmpsts in order to avoid a race
// as oddPrms will already have a first value when cmpsts is called to generate the second
def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
Iterator.iterate(new CIS(2L, () => oddPrms()))
{(cis: CIS[Long]) => cis.cont()}
.map {(cis: CIS[Long]) => cis.v}
}
上面的代码在大约 0.75 秒内生成 ideone 上的第 100,000 个素数 (1299709),开销约为 0.37 秒,并且对第 1,000,000 个素数 (15485863) 的经验计算复杂度约为 1.09(5.13 秒)。内存使用量在程序代码使用量之上可以忽略不计。
请注意,上面的代码是完全正常的,因为没有使用任何可变状态,但是 Bird 算法(甚至是树折叠版本)不如使用优先级队列或 HashMap 来处理更大的范围处理树合并的操作数具有比优先级队列的 log n 开销或 HashMap 的线性(摊销)性能更高的计算复杂度(尽管处理散列有很大的常数因子开销,因此优势是'直到使用一些真正大的范围才真正看到)。
这些代码使用如此少内存的原因是 CIS 流的制定没有永久引用流的开始,因此流在使用时会被垃圾收集,只留下最少数量的基本素数复合序列占位符,正如 Will Ness 所解释的那样,它非常小 - 只有 546 个基本素数复合数流,用于生成前一百万个素数,直到 15485863,每个占位符只占用几十个字节(8 个用于长数,8 个用于64 位函数引用,另外几个字节用于指向闭包参数的指针,另外几个字节用于函数和类开销,每个流占位符的总数可能为 40 个字节,或者总共不超过 20 KB用于生成一百万个素数的序列)。