【问题标题】:Implementing chopBy in Akka Streams在 Akka Streams 中实现 ChopBy
【发布时间】:2017-01-09 11:56:26
【问题描述】:

在回答this question 时,Odomontois 展示了如何实现一个惰性分组运算符,该运算符可以通过键对预先排序的流进行分组,而无需将整个内容存储在内存中。有没有办法用 Akka 的流(即源对象)做这样的事情?或者,有什么方法可以从 Akka 源中提取常规 Stream 对象,以便我可以使用 Odomontois 的 ChopBy?

这是一个完全失败的尝试,它不起作用:

  implicit class SourceChopOps[T, NU](s: Source[T, NU]) {
    def chopBy[U](f: T => U) = {
      s.prefixAndTail(1)
        .map(pt => (pt._1.head, pt._2))
        .map {
          case (prefix, tail) =>
            // what to do with pulled off head???
            tail.takeWhile(e => f(e) == f(prefix)) ++ tail.dropWhile(e => f(e) == f(prefix)).chopBy(f) // fails here
        }
      }
    }
  }

【问题讨论】:

  • 你查过官方文档吗? doc.akka.io/docs/akka/2.4.9/scala/stream/…
  • 感谢@fGo 的信息。 Akka groupBy 是否以某种方式解决了将大多数中间数据保存在内存中的需求?在返回之前是否需要保留每个子流的数据?或者它是否避免使用一些非常巧妙的流量控制技巧来做到这一点?前一种情况是 ChopBy 背后的主要推动力,它一次只需要在内存中保存一个键的数据。

标签: scala akka akka-stream scala-streams


【解决方案1】:

groupBy 在 Akka Streams 中会将您分组的键保存在内存中,但流区域总是“惰性”的,因为它们具有背压,因此它将在有限的内存中运行。如果下游不接受新元素,则上游不会产生新元素。

例如:

case class Record(id: Int)
Source.fromIterator(() => 
    Iterator
      .fill(1000)(Iterator(1,2).map { n => println("creating"); Record(n) })
      .flatten)
  .groupBy(maxSubstreams = 2, _.id)
  .map { r => println("Consuming"); r }
  .fold(0)((acc, _) => acc + 1)
  .mergeSubstreams
  .runForeach(println)

将向您展示Record 实例是如何在两个子流中的每一个子流中尽可能快地生成的,而不是全部预先生成。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-04-13
    • 1970-01-01
    • 2015-07-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-06
    相关资源
    最近更新 更多