【发布时间】:2017-01-09 11:56:26
【问题描述】:
在回答this question 时,Odomontois 展示了如何实现一个惰性分组运算符,该运算符可以通过键对预先排序的流进行分组,而无需将整个内容存储在内存中。有没有办法用 Akka 的流(即源对象)做这样的事情?或者,有什么方法可以从 Akka 源中提取常规 Stream 对象,以便我可以使用 Odomontois 的 ChopBy?
这是一个完全失败的尝试,它不起作用:
implicit class SourceChopOps[T, NU](s: Source[T, NU]) {
def chopBy[U](f: T => U) = {
s.prefixAndTail(1)
.map(pt => (pt._1.head, pt._2))
.map {
case (prefix, tail) =>
// what to do with pulled off head???
tail.takeWhile(e => f(e) == f(prefix)) ++ tail.dropWhile(e => f(e) == f(prefix)).chopBy(f) // fails here
}
}
}
}
【问题讨论】:
-
感谢@fGo 的信息。 Akka groupBy 是否以某种方式解决了将大多数中间数据保存在内存中的需求?在返回之前是否需要保留每个子流的数据?或者它是否避免使用一些非常巧妙的流量控制技巧来做到这一点?前一种情况是 ChopBy 背后的主要推动力,它一次只需要在内存中保存一个键的数据。
标签: scala akka akka-stream scala-streams