【发布时间】:2014-01-12 10:23:51
【问题描述】:
是否可以在 State monad 中的恒定堆栈和堆空间中执行折叠?还是其他功能技术更适合我的问题?
接下来的部分描述了问题和一个激励用例。我正在使用 Scala,但也欢迎使用 Haskell 中的解决方案。
折叠State Monad 填满堆
假设 Scalaz 7。考虑状态单子中的单子折叠。为了避免堆栈溢出,我们将折叠折叠。
import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor
type S = Int // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad
type R = Int // or some other monoid
val col: Iterable[R] = largeIterableofRs() // defined elsewhere
val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){
(acc: R, x: R) => StateT[Trampoline, S, R] {
s: S => Trampoline.done {
(s + 1, Monoid[R].append(acc, x))
}
}
} run 0 run
// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap. Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.
对于大型集合 col,这将填满堆。
我相信在折叠过程中,会为集合中的每个值(x: R 参数)创建一个闭包(一个 State mobit),填充堆。在执行run 0 之前,这些都不能被评估,提供初始状态。
可以避免这种 O(n) 堆使用吗?
更具体地说,是否可以在折叠之前提供初始状态,以便 State monad 可以在每次绑定期间执行,而不是嵌套闭包以供以后评估?
或者可以构造折叠以便在状态单子为run 之后延迟执行?这样,下一个 x: R 闭包将在前一个闭包被评估并适合垃圾回收之后才会创建。
或者这种工作有更好的功能范式吗?
示例应用程序
但也许我使用了错误的工具来完成这项工作。示例用例的演变如下。我是不是走错了路?
考虑reservoir sampling,即从一个太大而无法放入内存的集合中一次性挑选一个统一的随机k项目。在 Scala 中,这样的函数可能是
def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]
如果拉皮条到TraversableOnce 类型可以这样使用
val tenRandomInts = (Int.Min to Int.Max) sample 10
sample所做的工作本质上是fold:
def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}
但是,update 是有状态的;这取决于n,已经看到的项目数量。 (它也依赖于一个 RNG,但为了简单起见,我假设它是全局的和有状态的。用于处理 n 的技术将很容易扩展。)。那么如何处理这种状态呢?
不纯的解决方案很简单,并以恒定的堆栈和堆运行。
/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
var n = 0
def apply(sample: Vector[A], x: A): Vector[A] = {
n += 1
algorithmR(k, n, acc, x)
}
}
def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
if (sample.size < k) {
sample :+ x // must keep first k elements
} else {
val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
if (r <= k)
sample.updated(r - 1, x) // sample is 0-index
else
sample
}
}
但是纯粹的功能解决方案呢? update 必须将n 作为附加参数,并与更新的样本一起返回新值。我们可以在隐式状态中包含n,折叠累加器,例如,
(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2
但这掩盖了意图;我们只是真的打算累积样本向量。这个问题似乎已经为 State monad 和 monadic left fold 做好了准备。让我们再试一次。
我们将使用带有这些导入的 Scalaz 7
import scalaz._
import Scalaz._
import scalaz.std.iterable_
并在 Iterable[A] 上进行操作,因为 Scalaz 不支持 Traversable 的一元折叠。
sample 现已定义
// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
type M[B] = State[Int, B]
// foldLeftM is implemented using foldRight, which must reverse `col`, blowing
// the heap for large `col`. Ignore this issue for now.
// foldLeftM could be implemented differently or we could switch to
// foldRightM, implemented using foldLeft.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}
更新在哪里
// update using State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => State[Int, Vector[A]] {
n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
}
}
不幸的是,这会破坏大量集合的堆栈。
所以让我们蹦床吧。 sample 现在是
// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B]
type M[B] = TrampolinedState[Int, B]
// Same caveat about foldLeftM using foldRight and blowing the heap
// applies here. Ignore for now. This solution blows the heap anyway;
// let's fix that issue first.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}
更新在哪里
// update using trampolined State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
}
}
这修复了堆栈溢出,但仍然会破坏非常大的集合(或非常小的堆)的堆。每个匿名函数
集合中的值是在折叠期间创建的(我相信关闭每个 x: A 参数),甚至在蹦床运行之前消耗堆。 (FWIW,State 版本也有这个问题;堆栈溢出首先出现在较小的集合中。)
【问题讨论】:
-
我不认为你的猜测是准确的,在堆上创建了“每个值一个函数”,这就是在吞噬你的记忆。复合函数是延迟创建的。想想看。当你说
f = s => bigFun()时,bigFun直到你通过s才真正发生。此时f可以被丢弃除非你坚持。更可能的情况是您的收藏过于严格。尝试使用EphemeralStream并比较结果。 -
延迟创建是我最初的理解,但我看到创建了那些闭包(使用分析器)。它是在提供初始状态并运行蹦床之后,但在蹦床实际执行每件事之前。在你的答案中查看我的 cmets。
-
顺便说一下,一旦我的困惑得到解决,我将编辑我的问题以删除红鲱鱼(例如,集合是否适合内存。这实际上并不相关;只是大 O 堆的使用一元折叠...)
标签: scala functional-programming monads scalaz scalaz7