【问题标题】:Monadic fold with State monad in constant space (heap and stack)?状态单子在恒定空间(堆和堆栈)中的单子折叠?
【发布时间】:2014-01-12 10:23:51
【问题描述】:

是否可以在 State monad 中的恒定堆栈和堆空间中执行折叠?还是其他功能技术更适合我的问题?

接下来的部分描述了问题和一个激励用例。我正在使用 Scala,但也欢迎使用 Haskell 中的解决方案。


折叠State Monad 填满堆

假设 Scalaz 7。考虑状态单子中的单子折叠。为了避免堆栈溢出,我们将折叠折叠。

import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline

type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor

type S = Int  // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad

type R = Int  // or some other monoid

val col: Iterable[R] = largeIterableofRs() // defined elsewhere

val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){ 
    (acc: R, x: R) => StateT[Trampoline, S, R] {
      s: S => Trampoline.done { 
        (s + 1, Monoid[R].append(acc, x))
      }
    }
} run 0 run

// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap.  Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.

对于大型集合 col,这将填满堆。

我相信在折叠过程中,会为集合中的每个值(x: R 参数)创建一个闭包(一个 State mobit),填充堆。在执行run 0 之前,这些都不能被评估,提供初始状态。

可以避免这种 O(n) 堆使用吗?

更具体地说,是否可以在折叠之前提供初始状态,以便 State monad 可以在每次绑定期间执行,而不是嵌套闭包以供以后评估?

或者可以构造折叠以便在状态单子为run 之后延迟执行?这样,下一个 x: R 闭包将在前一个闭包被评估并适合垃圾回收之后才会创建。

或者这种工作有更好的功能范式吗?


示例应用程序

但也许我使用了错误的工具来完成这项工作。示例用例的演变如下。我是不是走错了路?

考虑reservoir sampling,即从一个太大而无法放入内存的集合中一次性挑选一个统一的随机k项目。在 Scala 中,这样的函数可能是

def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]

如果拉皮条到TraversableOnce 类型可以这样使用

val tenRandomInts = (Int.Min to Int.Max) sample 10

sample所做的工作本质上是fold

def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
    col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}

但是,update 是有状态的;这取决于n,已经看到的项目数量。 (它也依赖于一个 RNG,但为了简单起见,我假设它是全局的和有状态的。用于处理 n 的技术将很容易扩展。)。那么如何处理这种状态呢?

不纯的解决方案很简单,并以恒定的堆栈和堆运行。

/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
    var n = 0
    def apply(sample: Vector[A], x: A): Vector[A] = {
        n += 1
        algorithmR(k, n, acc, x)
    }
}

def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
        sample :+ x // must keep first k elements
    } else {
        val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
        if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
        else
            sample
    }
}

但是纯粹的功能解决方案呢? update 必须将n 作为附加参数,并与更新的样本一起返回新值。我们可以在隐式状态中包含n,折叠累加器,例如,

(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2

但这掩盖了意图;我们只是真的打算累积样本向量。这个问题似乎已经为 State monad 和 monadic left fold 做好了准备。让我们再试一次。

我们将使用带有这些导入的 Scalaz 7

import scalaz._
import Scalaz._
import scalaz.std.iterable_

并在 Iterable[A] 上进行操作,因为 Scalaz 不支持 Traversable 的一元折叠。

sample 现已定义

// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {       
    type M[B] = State[Int, B]

    // foldLeftM is implemented using foldRight, which must reverse `col`, blowing
    // the heap for large `col`.  Ignore this issue for now.
    // foldLeftM could be implemented differently or we could switch to
    // foldRightM, implemented using foldLeft.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}

更新在哪里

// update using State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
}

不幸的是,这会破坏大量集合的堆栈。

所以让我们蹦床吧。 sample 现在是

// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
    import Free.Trampoline

    type TrampolinedState[S, B] = StateT[Trampoline, S, B]
    type M[B] = TrampolinedState[Int, B]

    // Same caveat about foldLeftM using foldRight and blowing the heap
    // applies here.  Ignore for now. This solution blows the heap anyway;
    // let's fix that issue first.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}

更新在哪里

// update using trampolined State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
        n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
    }
}

这修复了堆栈溢出,但仍然会破坏非常大的集合(或非常小的堆)的堆。每个匿名函数 集合中的值是在折叠期间创建的(我相信关闭每个 x: A 参数),甚至在蹦床运行之前消耗堆。 (FWIW,State 版本也有这个问题;堆栈溢出首先出现在较小的集合中。)

【问题讨论】:

  • 我不认为你的猜测是准确的,在堆上创建了“每个值一个函数”,这就是在吞噬你的记忆。复合函数是延迟创建的。想想看。当你说f = s =&gt; bigFun() 时,bigFun 直到你通过s 才真正发生。此时f 可以被丢弃除非你坚持。更可能的情况是您的收藏过于严格。尝试使用EphemeralStream 并比较结果。
  • 延迟创建是我最初的理解,但我看到创建了那些闭包(使用分析器)。它是在提供初始状态并运行蹦床之后,但在蹦床实际执行每件事之前。在你的答案中查看我的 cmets。
  • 顺便说一下,一旦我的困惑得到解决,我将编辑我的问题以删除红鲱鱼(例如,集合是否适合内存。这实际上并不相关;只是大 O 堆的使用一元折叠...)

标签: scala functional-programming monads scalaz scalaz7


【解决方案1】:

我们真正的问题是未执行的 State mobits 使用的堆。

不,不是。真正的问题是集合不适合内存并且foldLeftMfoldRightM 强制整个集合。不纯解决方案的一个副作用是您正在释放内存。在“纯功能”解决方案中,您不会在任何地方这样做。

您对Iterable 的使用忽略了一个关键细节:col 实际上是什么类型的集合,它的元素是如何创建的,以及它们应该如何被丢弃。因此,foldLeftM 必然会在 Iterable 上进行。它可能过于严格,并且您将整个集合强制放入内存。例如,如果它是Stream,那么只要你坚持col,到目前为止所有强制的元素都将在内存中。如果是其他类型的懒惰Iterable 不记住它的元素,那么折叠仍然太严格了。

我用EphemeralStream 尝试了您的第一个示例,没有看到任何显着的堆压力,即使它显然具有相同的“未执行状态mobits”。不同之处在于EphemeralStream 的元素被弱引用,其foldRight 不会强制整个流。

我怀疑如果您使用Foldable.foldr,那么您不会看到有问题的行为,因为它与一个在其第二个参数 中惰性的函数折叠。当你调用折叠时,你希望它立即返回一个看起来像这样的暂停:

Suspend(() => head |+| tail.foldRightM(...))

当蹦床恢复第一次暂停并运行到下一次暂停时,暂停之间的所有分配都将可供垃圾收集器释放。

尝试以下方法:

def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
  if (bs.isEmpty) Monad[M].point(a)
  else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))

val MS = StateT.stateTMonadState[Int, Trampoline]
import MS._

foldM[M,R,Int](Monoid[R].zero, col) {
  (x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
} run 0 run

对于蹦床单子M,这将在恒定堆中运行,但对于非蹦床单子,它将溢出堆栈。

真正的问题是Iterable 不是一个很好的抽象数据,因为数据太大而无法放入内存。当然,您可以编写一个命令式的副作用程序,在其中显式丢弃每次迭代后的元素或使用惰性右折叠。在您想将该程序与另一个程序组合之前,这很有效。我假设您在 State monad 中调查这样做的全部原因是为了获得组合性。

那你能做什么?以下是一些选项:

  1. 使用ReducerMonoid 及其组合,然后作为最后一步在命令式显式释放循环(或蹦床式惰性右折叠)中运行,然后组合是不可能的或预期的。
  2. 使用Iteratee 组合和一元Enumerators 来喂它们。
  3. 使用Scalaz-Stream 编写组合流转换器。

最后一个选项是我在一般情况下会使用和推荐的选项。

【讨论】:

  • 对于我的测试,我使用了匿名的new Iterator{...}(只是增加了var: Int)。这不会在内存中保存先前的元素(通过示例应用程序中的有状态解决方案确认)。它的行为在其他 sample 实现上应该是相同的。
  • 我不关心某些要求所有元素都在内存中的 Iterable 集合——在选择集合时应该考虑这一点。我担心 foldLeftM[State[B]](...)(...) 使用 additional O(n) 堆空间。 (我应该在问题上更具体;我只是认为“太大而无法记忆”的解释更容易解释。)
  • 要清楚,我并不是要对您的分析提出异议(见鬼,我从您关于免费单子和堆栈溢出的论文中了解到StateT[Trampoline, S, B]),请了解我的问题的根本原因.您的三个建议可能更适合我的问题(谢谢!),但我想了解为什么我的 foldLeftM[StateT[Trampoline, S, B] 没有使用恒定的附加堆。
  • 在 N 个元素集合上使用 foldLeftM,我看到创建了 N 个 scalaz.IterableInstances$$anonfunc$foldRight$1$1$$anonfun$apply$1 函数,每个函数都引用了集合元素上的 scalaz.Foldable$$anonfun$foldLeftM$2$$anonfun$apply$10 闭包。这些都是在调用 Trampoline.run 时立即创建的,在执行和展开之前。
  • 更新了答案以反映您为什么使用EphemeralStream
【解决方案2】:

使用State 或任何类似的单子不是解决问题的好方法。 使用State 注定会破坏大型集合的堆栈/堆。 考虑从一个大集合构造的值x: State[A,B](对于 例如通过折叠它)。然后x 可以在初始状态A 的不同值上进行评估,产生不同的结果。所以x需要保留所有信息 包含在集合中。在纯粹的设置中,x 不能忘记一些 信息不会破坏堆栈/堆,因此计算的任何内容都保留在 内存直到整个 monadic 值被释放,这仅在 结果被评估。所以x的内存消耗与集合的大小成正比。

我认为解决这个问题的合适方法是使用函数式iteratees/pipes/conduits。发明这个概念(在这三个名称下)是为了处理具有恒定内存消耗的大量数据集合,并使用简单的组合器来描述此类过程。

我尝试使用 Scalaz'Iteratees,但这部分似乎还不成熟,它会像State 一样遭受堆栈溢出(或者我可能没有正确使用它;代码可用here,如果有人感兴趣)。

但是,使用我的(仍然有点实验性)scala-conduit 库很简单(免责声明:我是作者):

import conduit._
import conduit.Pipe._

object Run extends App {
  // Define a sampling function as a sink: It consumes
  // data of type `A` and produces a vector of samples.
  def sampleI[A](k: Int): Sink[A, Vector[A]] =
    sampleI[A](k, 0, Vector())

  // Create a sampling sink with a given state. It requests
  // a value from the upstream conduit. If there is one,
  // update the state and continue (the first argument to `requestF`).
  // If not, return the current sample (the second argument).
  // The `Finalizer` part isn't important for our problem.
  private def sampleI[A](k: Int, n: Int, sample: Vector[A]):
                  Sink[A, Vector[A]] =
    requestF((x: A) => sampleI(k, n + 1, algorithmR(k, n + 1, sample, x)),
             (_: Any) => sample)(Finalizer.empty)


  // The sampling algorithm copied from the question.
  val rand = new scala.util.Random()

  def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
      sample :+ x // must keep first k elements
    } else {
      val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
      if (r <= k)
        sample.updated(r - 1, x) // sample is 0-index
      else
        sample
    }
  }

  // Construct an iterable of all `short` values, pipe it into our sampling
  // funcition, and run the combined pipe.
  {
    print(runPipe(Util.fromIterable(Short.MinValue to Short.MaxValue) >->
          sampleI(10)))
  }
}

更新:使用State 可以解决问题,但我们需要专门为State 实现一个自定义折叠,它知道如何做到这一点恒定空间:

import scala.collection._
import scala.language.higherKinds
import scalaz._
import Scalaz._
import scalaz.std.iterable._

object Run extends App {
  // Folds in a state monad over a foldable
  def stateFold[F[_],E,S,A](xs: F[E],
                            f: (A, E) => State[S,A],
                            z: A)(implicit F: Foldable[F]): State[S,A] =
    State[S,A]((s: S) => F.foldLeft[E,(S,A)](xs, (s, z))((p, x) => f(p._2, x)(p._1)))


  // Sample a lazy collection view
  def sampleS[F[_],A](k: Int, xs: F[A])(implicit F: Foldable[F]):
                  State[Int,Vector[A]] =
    stateFold[F,A,Int,Vector[A]](xs, update(k), Vector())

  // update using State monad
  def update[A](k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
  }

  def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = ...

  {
    print(sampleS(10, (Short.MinValue to Short.MaxValue)).eval(0))
  }
}

【讨论】:

  • 你的第一段与我对正在发生的事情的理解相吻合——单子用自己的 O(N) 大小的一组闭包引用了整个集合,并且在提供之前无法展开/释放这些闭包初始状态。我相信@Apocalisp 是说,对于 fold 的适当实现,在提供初始状态并运行蹦床之前不会迭代集合——加载下一个元素时可以释放一个元素。
  • 无论如何,你们俩建议的Iteratee/conduit 方法应该避免所有这些复杂性和折叠实现特定的头痛。
  • @DavidB。另一个选项(在答案中更新)是创建一个特定的函数,用于在恒定空间中折叠 State monad。我想这是 Tramolined monad 方法的一半。
猜你喜欢
  • 1970-01-01
  • 2017-04-24
  • 1970-01-01
  • 2020-07-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多