【发布时间】:2014-03-31 22:24:53
【问题描述】:
我正在尝试制作一个将流写入分桶文件的接收器:当达到特定条件(时间、文件大小等)时,关闭当前输出流并打开一个新输出流到一个新的存储桶文件。
我检查了如何在io 对象中创建不同的接收器,但没有太多示例。所以我试着按照resource 和chunkW 的编写方式。我最终得到了以下代码,为简单起见,桶目前仅由 Int 表示,但最终将是某种类型的输出流。
val buckets: Channel[Task, String, Int] = {
//recursion to step through the stream
def go(step: Task[String => Task[Int]]): Process[Task, String => Task[Int]] = {
// Emit the value and repeat
def next(msg: String => Task[Int]) =
Process.emit(msg) ++
go(step)
Process.await[Task, String => Task[Int], String => Task[Int]](step)(
next
, Process.halt // TODO ???
, Process.halt) // TODO ???
}
//starting bucket
val acquire: Task[Int] = Task.delay {
val startBuck = nextBucket(0)
println(s"opening bucket $startBuck")
startBuck
}
//the write step
def step(os: Int): Task[String => Task[Int]] =
Task.now((msg: String) => Task.delay {
write(os, msg)
val newBuck = nextBucket(os)
if (newBuck != os) {
println(s"closing bucket $os")
println(s"opening bucket $newBuck")
}
newBuck
})
//start the Channel
Process.await(acquire)(
buck => go(step(buck))
, Process.halt, Process.halt)
}
def write(bucket: Int, msg: String) { println(s"$bucket\t$msg") }
def nextBucket(b: Int) = b+1
这里面有很多问题:
-
step在开始时被传递一次,这在递归期间永远不会改变。我不确定如何在递归go中创建一个新的step任务,该任务将使用前一个任务中的存储桶 (Int),因为我必须提供一个字符串才能完成该任务。 -
await调用中的fallback和cleanup没有收到rcv的结果(如果有的话)。在io.resource函数中,它可以正常工作,因为资源是固定的,但是,在我的情况下,资源可能会在任何步骤发生变化。我将如何将对当前打开的存储桶的引用传递给这些回调?
【问题讨论】:
-
好的,与此同时,我创建了自己的
BucketedWriter extends Writer,可以与resource一起使用,但这是非常必要的(实现java api)。
标签: scala scalaz scalaz-stream