【问题标题】:How to aggregate elements of one Akka stream based on elements of another?如何基于另一个 Akka 流的元素聚合一个 Akka 流的元素?
【发布时间】:2016-07-06 16:12:17
【问题描述】:

示例场景:将流的字节分组为由另一个流(整数)确定的大小块。

def partition[A, B, C](
  first:Source[A, NotUsed],
  second:Source[B, NotUsed],
  aggregate:(Int => Seq[A], B) => C
):Source[C, NotUsed] = ???

val bytes:Source[Byte, NotUsed] = ???
val sizes:Source[Int, NotUsed] = ???

val chunks:Source[ByteString, NotUsed] =
  partition(bytes, sizes, (grab, count) => ByteString(grab(count)))

我最初的尝试包括Flow#scanFlow#prefixAndTail 的组合,但感觉不太正确(见下文)。我还看了Framing,但它似乎不适用于上面的示例场景(也不足以容纳非字节串流)。我猜我唯一的选择是使用 Graphs(或更通用的 FlowOps#transform),但我对 Akka 流还不够精通,无法尝试这样做。


这是我到目前为止所能想到的(特定于示例场景):

val chunks:Source[ByteString, NotUsed] = sizes
  .scan(bytes prefixAndTail 0) {
    (grouped, count) => grouped flatMapConcat {
      case (chunk, remainder) => remainder prefixAndTail count
    }
  }
  .flatMapConcat(identity)
  .collect { case (chunk, _) if chunk.nonEmpty => ByteString(chunk:_*) }

【问题讨论】:

    标签: scala stream akka reactive-programming akka-stream


    【解决方案1】:

    我认为您可以将处理实现为自定义GraphStage。该阶段将有两个Inlet 元素。一个取字节,另一个取大小。它将有一个 Outlet 元素产生值。

    考虑以下输入流。

    def randomChars = Iterator.continually(Random.nextPrintableChar())
    def randomNumbers = Iterator.continually(math.abs(Random.nextInt() % 50))
    
    val bytes: Source[Char, NotUsed] =
      Source.fromIterator(() => randomChars)
    
    val sizes: Source[Int, NotUsed] =
      Source.fromIterator(() => randomNumbers).filter(_ != 0)
    

    然后使用描述自定义流处理的信息 (http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html),您可以构造 GraphStage

    case class ZipFraming() extends GraphStage[FanInShape2[Int, Char, (Int, ByteString)]] {
    
      override def initialAttributes = Attributes.name("ZipFraming")
    
      override val shape: FanInShape2[Int, Char, (Int, ByteString)] =
        new FanInShape2[Int, Char, (Int, ByteString)]("ZipFraming")
    
      val inFrameSize: Inlet[Int] = shape.in0
      val inElements: Inlet[Char] = shape.in1
    
      def out: Outlet[(Int, ByteString)] = shape.out
    
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
        new GraphStageLogic(shape) {
          // we will buffer as much as 512 characters from the input
          val MaxBufferSize = 512
          // the buffer for the received chars
          var buffer = Vector.empty[Char]
          // the needed number of elements
          var needed: Int = -1
          // if the downstream is waiting
          var isDemanding = false
    
          override def preStart(): Unit = {
            pull(inFrameSize)
            pull(inElements)
          }
    
          setHandler(inElements, new InHandler {
            override def onPush(): Unit = {
              // we buffer elements as long as we can
              if (buffer.size < MaxBufferSize) {
                buffer = buffer :+ grab(inElements)
                pull(inElements)
              }
              emit()
            }
          })
    
          setHandler(inFrameSize, new InHandler {
            override def onPush(): Unit = {
              needed = grab(inFrameSize)
              emit()
            }
          })
    
          setHandler(out, new OutHandler {
            override def onPull(): Unit = {
              isDemanding = true
              emit()
            }
          })
    
          def emit(): Unit = {
            if (needed > 0 && buffer.length >= needed && isDemanding) {
              val (emit, reminder) = buffer.splitAt(needed)
              push(out, (needed, ByteString(emit.map(_.toByte).toArray)))
              buffer = reminder
              needed = -1
              isDemanding = false
              pull(inFrameSize)
              if (!hasBeenPulled(inElements)) pull(inElements)
            }
          }
        }
    }
    

    这就是你运行它的方式。

    RunnableGraph.fromGraph(GraphDSL.create(bytes, sizes)(Keep.none) { implicit b =>
      (bs, ss) =>
        import GraphDSL.Implicits._
    
        val zipFraming = b.add(ZipFraming())
    
        ss ~> zipFraming.in0
        bs ~> zipFraming.in1
    
        zipFraming.out ~> Sink.foreach[(Int, ByteString)](e => println((e._1, e._2.utf8String)))
    
        ClosedShape
    }).run()
    

    【讨论】:

    • 这种方法的问题在于,size 的字符数将总是bytesfront 中挑选出来,所以结果流的元素都将共享相同的前缀。
    • @Andrey 你是对的。我已经更新了我的答案,实际实现了 GraphStage,它作为 Zip 和框架之间的东西起作用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-10
    • 2017-02-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多