【问题标题】:generator/block to iterator/stream conversion生成器/块到迭代器/流的转换
【发布时间】:2011-04-17 09:31:13
【问题描述】:

基本上我想转换这个:

def data(block: T => Unit)

到一个流(dataToStream 是一个进行这种转换的假设函数):

val dataStream: Stream[T] = dataToStream(data)

我想这个问题可以通过延续来解决:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

谢谢,大卫

【问题讨论】:

  • 悬赏无线程答案,或令人信服的论点,没有任何答案。
  • 你的“块”没有任何价值。那怎么能变成流呢?单位是单例。
  • 所需的流是发送到“block”的一系列参数,而不是这些调用的结果。
  • 为什么需要流?有什么特殊原因吗? Traversable 或 TraversableView 为您提供了很大的灵活性。 map、flatMap、filter 等都是惰性的。它使用异常来防止在调用诸如 take 之类的方法时对“阻塞”的每次调用。总而言之,这里对 Stream 的需求似乎很琐碎,并且确实需要(A)使用线程能够在“数据”函数和流迭代之间来回交换堆栈。或 (B) 缓冲所有值并从此缓冲区创建 Stream。这更多的是你在 JVM 上拥有什么工具,虽然我很想感到惊讶
  • 这只是一个例子。我不在乎我最终会使用 Stream、Iterator 还是 Traversable。本质是将数据生成器转换为惰性、内存和 CPU 高效的“数据流”。

标签: scala callback yield continuations generator


【解决方案1】:

已编辑:修改示例以显示 traversable.view 的惰性

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

toTraversable 方法会将您的数据函数转换为 Traversable 集合。就其本身而言,它没什么大不了的,但您可以将其转换为惰性的 TraversableView。这是一个例子:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

take 方法的不幸性质是它必须超过最后一个生成的值才能正常工作,但它会提前终止。如果没有“.view”调用,上面的代码看起来是一样的。然而,这里有一个更引人注目的例子:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

因此,总而言之,我相信您正在寻找的集合是 TraversableView,它最容易创建创建 Traversable 的视图,然后在其上调用“视图”。如果你真的想要 Stream 类型,这里有一个在 2.8.0.final 中工作的方法,它会生成一个没有线程的“Stream”:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

这种方法的不幸之处在于它会在创建流之前遍历整个可遍历对象。这也意味着所有值都需要在内存中缓冲。唯一的选择是诉诸线程。

顺便说一句:这是更喜欢 Traversables 作为 scalax.io.File 方法的直接返回的动机:“lines”、“chars”和“bytes”。

【讨论】:

  • 如您所见,首先评估数据,然后将其转换为流。所以这里没有懒惰。
  • 我的观点是,如果您使用 TraversableView,您可以将数据作为“流”进行交互。通过要求类型“流”,您限制了自己。 TraversableView 懒惰的。
  • 如果可遍历视图在 REPL 中看起来并不懒惰,那是因为 REPL 在结果表达式上调用了“toString”,这将导致 TraversableView 遍历整个集合(显示所有值)。如果你使用 TraversableView 开发一个函数,你会看到它的惰性。
  • 嗯,确实不错。有时这种解决方案就足够了(尤其是当您想要遍历一行中的所有数据时),有时则不然。请参阅gist.github.com/603569 理想情况下,最后一个示例输出也应该是交错的。很遗憾你不能为它制作 Stream 或 Iterator 或者你可以,但它会首先评估所有数据。如果你有一个 Stream/Iterator,你可以并行使用两个或多个数据流。例如从另一个迭代器的那个 take(10) 中的 take(3)。无论如何,这是一段很棒且有用的代码!
  • 使用线程,当你没有消耗所有数据时,线程不会停止而是暂停。所以它也有缺点......
【解决方案2】:

这是一个简单的解决方案,它产生一个使用数据的线程。它将数据发布到 SynchronousQueue。创建并返回从队列中提取数据的流:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   

【讨论】:

【解决方案3】:

我仍然需要自己弄清楚如何做到这一点。我怀疑答案就在这里:

编辑:删除了显示如何解决不同问题的代码。

Edit2:使用最初发布http://gist.github.com/574873 的代码http://gist.github.com/580157,您可以这样做:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

data 不采用块代码,但我认为这很好,因为通过继续,调用者可以处理块。生成器的代码可以在 github 上的 gist 中看到。

【讨论】:

  • Ehrm,你不是解决了与 OP 完全不同的问题吗? OP 的data 函数调用了block 函数十次,他想把它变成一个包含十个元素的流。你的data 函数只调用一次block
  • @sepp2k,错误,确实如此。那么我想继续是必要的。
  • 我尝试使用该线程stackoverflow.com/questions/2201882/…中的代码但没有成功
  • 是的,我以前试过。不幸的是,由于 CPS 的限制,它不能解决问题。见代码gist.github.com/599575 返回错误:类型不匹配;找到:单位 @scala.util.continuations.cpsParam[Unit,Unit] 需要:单位数据 { i => yld(i) }
  • @Dawid 查看我添加到该 sn-p 的评论。
【解决方案4】:

这是一个基于分隔的基于延续的实现,改编自 @Geoff Reedy 的产品:

import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-05-18
    • 2018-05-16
    • 2015-06-24
    • 2021-10-29
    • 2011-10-06
    • 1970-01-01
    • 1970-01-01
    • 2011-03-24
    相关资源
    最近更新 更多