【问题标题】:How to write an enumeratee to chunk an enumerator along different boundaries如何编写枚举器以沿不同边界对枚举器进行分块
【发布时间】:2012-05-07 23:07:07
【问题描述】:

因此,Play2.0 Enumeratee page 显示了使用&>through 方法将Enumerator[String] 更改为Enumerator[Int] 的示例:

val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt }
val ints: Enumerator[Int] = strings &> toInt

还有一个Enumeratee.grouped enumeratee 用于从单个元素创建块的枚举器。这似乎工作正常。

但我看到的是通常的输入形式为Array[Byte](由Enumerator.fromFileEnumerator.fromStream 返回)。考虑到这一点,我想将这些Array[Byte] 输入转换为Enumerator[String],例如每个字符串都是一行(以'\n' 结尾)。线条和Array[Byte] 元素的边界通常不匹配。如何编写可以将分块数组转换为分块字符串的枚举器?

目的是在每个Array[Byte] 可用时将这些行分块回浏览器,并保留不属于完整行的剩余字节,直到下一个输入块出现。

理想情况下,我希望有一个方法,给出iter: Iteratee[Array[Byte], T]Enumerator[Array[Byte]] 将返回Enumerator[T],其中我的T 元素由iter 解析。

附加信息:我有一些时间来清理我的代码,这是我正在尝试做的一个具体示例。我有以下检测下一行的迭代:

import play.api.libs.iteratee._
type AB = Array[Byte]

def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = {
  def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match {
    case Input.EOF => Done(acc, Input.EOF)
    case Input.Empty => Cont(step(_, acc))
    case Input.El(arr) =>
      val (taking, rest) = arr.span(pred)
      if (rest.length > 0) Done(acc ++ taking, Input.El(rest)) 
      else Cont(step(_, acc ++ taking)) 
  }
  Cont(step(_, Array()))
}

val line = for {
  bytes <- takeWhile(b => !(b == '\n' || b == '\r'))
  _     <- takeWhile(b =>   b == '\n' || b == '\r')
} yield bytes

而我想做的是这样的:

Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain")

【问题讨论】:

    标签: scala playframework enumerator playframework-2.0 iterate


    【解决方案1】:

    https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131 做与你正在做的事情类似的事情。我固定分组以处理剩余的输入。代码基本上是这样的:

    val upToNewLine = 
      Traversable.splitOnceAt[String,Char](_ != '\n')  &>>
      Iteratee.consume()
    
    Enumeratee.grouped(upToNewLine)
    

    我也必须以同样的方式修复重复

    【讨论】:

      【解决方案2】:

      这是我经过几个小时的实验后得到的结果。我希望有人能想出一个更优雅的实现,因为我几乎无法理解我的。

      def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] {
        def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = {
          def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]):
                Iteratee[AB, Iteratee[AB, A]] = {
            e match {
              case Input.EOF =>
                // if we have a leftover and it's a chunk, then output it
                leftover match {
                  case Input.EOF | Input.Empty => Done(in, leftover)
                  case Input.El(_) =>
                    val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover)
                      >>> Enumerator.eof |>> chunker)
                    lastChunk.pureFlatFold(
                      done = { (chunk, rest) =>
                        val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in)
                        nextIn.pureFlatFold(
                          done = (a, e2) => Done(nextIn, e2),
                          // nothing more will come
                          cont = k => Done(nextIn, Input.EOF),
                          error = (msg, e2) => Error(msg, e2))
                      },
                      // not enough content to get a chunk, so drop content
                      cont = k => Done(in, Input.EOF),
                      error = (msg, e2) => Error(msg, e2))
                }
              case Input.Empty => Cont(step(_, in, leftover))
              case Input.El(arr) =>
                // feed through chunker
                val iChunks = Iteratee.flatten(
                  Enumerator.enumInput(leftover)
                    >>> Enumerator(arr)
                    >>> Enumerator.eof // to extract the leftover
                    |>> repeat(chunker))
                iChunks.pureFlatFold(
                  done = { (chunks, rest) =>
                    // we have our chunks, feed them to the inner iteratee
                    val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in)
                    nextIn.pureFlatFold(
                      done = (a, e2) => Done(nextIn, e2),
                      // inner iteratee needs more data
                      cont = k => Cont(step(_: Input[AB], nextIn,
                        // we have to ignore the EOF we fed to repeat
                        if (rest == Input.EOF) Input.Empty else rest)),
                      error = (msg, e2) => Error(msg, e2))
                  },
                  // not enough content to get a chunk, continue
                  cont = k => Cont(step(_: Input[AB], in, leftover)),
                  error = (msg, e2) => Error(msg, e2))
            }
          }
          Cont(step(_, inner, Input.Empty))
        }
      }
      

      这是我的自定义 repeat 的定义:

      // withhold the last chunk so that it may be concatenated with the next one
      def repeat(chunker: Iteratee[AB, AB]) = {
        def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB], 
              leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match {
          case Input.EOF => ch.pureFlatFold(
            done = (a, e) => Done(acc, leftover),
            cont = k => k(Input.EOF).pureFlatFold(
              done = (a, e) => Done(acc, Input.El(a)),
              cont = k => sys.error("divergent iter"),
              error = (msg, e) => Error(msg, e)),
            error = (msg, e) => Error(msg, e))
          case Input.Empty => Cont(loop(_, ch, acc, leftover))
          case Input.El(_) =>
            val i = Iteratee.flatten(Enumerator.enumInput(leftover) 
                >>> Enumerator.enumInput(e) |>> ch)
            i.pureFlatFold(
              done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty),
              cont = k => Cont(loop(_, i, acc, Input.Empty)),
              error = (msg, e) => Error(msg, e))
        }
        Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty))
      }
      

      这适用于几个样本,包括这个:

       val source = Enumerator(
         "bippy".getBytes,
         "foo\n\rbar\n\r\n\rbaz\nb".getBytes,
         "azam\ntoto\n\n".getBytes)
       Ok.stream(source 
         &> chunkBy(line) 
         &> Enumeratee.map(l => l ++ ".\n".getBytes)
       ).as("text/plain")
      

      哪些打印:

      bippyfoo.
      bar.
      baz.
      bazam.
      toto.
      

      【讨论】:

        猜你喜欢
        • 2020-05-17
        • 2016-06-23
        • 2023-03-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-02-13
        • 2015-09-23
        • 1970-01-01
        相关资源
        最近更新 更多