【问题标题】:Akka HTTP Streaming JSON DeserializationAkka HTTP 流式 JSON 反序列化
【发布时间】:2026-01-21 09:00:01
【问题描述】:

是否可以动态地将来自 Akka HTTP 的 外部 长度未知的 ByteString 流反序列化到域对象中?


上下文

我调用一个 infinitely 长的HTTP 端点,它输出一个不断增长的JSON Array

[
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    ...
] <- Never sees the daylight

【问题讨论】:

  • 为了澄清,您是尝试接收此 JSON 流还是广播此流?如果广播,你的内部表示是什么(例如迭代器,scala 流,......)?此外,通信必须是数组还是可以是单个域对象的流?
  • @RamonJRomeroyVigil 此流将完全是外部的。
  • 在您的特定情况下,您可以等待关闭 } 并调用您选择的反序列化器以获取中间文本。这需要一些操作,并且可能在 ByteString 上进行缓冲,但它们非常基本。
  • 你真的需要 JSON 数组吗?对 JSON 文档使用不同的分隔符(例如换行符)将使这项任务更容易。见doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/…
  • @PavelKudinov 你有这方面的例子吗?

标签: json akka akka-stream akka-http


【解决方案1】:

我想在这种情况下应该使用JsonFraming.objectScanner(Int.MaxValue)。正如文档所述:

返回一个实现基于“大括号计数”的框架的流 用于发出有效 JSON 块的运算符。它扫描传入的数据 流以获取有效的 JSON 对象并返回 ByteStrings 块 仅包含那些有效的块。一个典型的数据示例 可能要使用此运算符的框架包括:非常大的数组

所以你可以得到这样的结果:

val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl))

response.onComplete {
  case Success(value) =>
    value.entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(_.utf8String)         // In case you have ByteString
      .map(decode[MyEntity](_))  // Use any Unmarshaller here
      .grouped(20)
      .runWith(Sink.ignore)      // Do whatever you need here 
  case Failure(exception) => log.error(exception, "Api call failed")
}

【讨论】:

    【解决方案2】:

    我在尝试将 Twitter 流(无限字符串)解析为域对象时遇到了非常相似的问题。 我使用Json4s 解决了它,如下所示:

    case class Tweet(username: String, geolocation: Option[Geo])
    case class Geo(latitude: Float, longitude: Float)
    object Tweet{
        def apply(s: String): Tweet = {
            parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet]
        }
    }
    

    然后我只是缓冲流并将其映射到推文:

    val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8"))
    var line = reader.readLine()
    while(line != null){
        store(Tweet.apply(line))
        line = reader.readLine()
    }
    

    Json4s 完全支持 Option(或对象内部的自定义对象,例如示例中的 Geo)。因此,你可以像我一样放一个Option,如果Json中没有这个字段,就会设置为None。

    希望对你有帮助!

    【讨论】:

      【解决方案3】:

      我认为play-iteratees-extras 必须帮助你。这个库允许通过 Enumerator/Iteratee 模式解析 Json,当然,不要等待接收所有数据。

      例如,避免构建代表“无限”Json 数组的“无限”字节流。

      import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee}
      
      var i = 0
      var isFirstWas = false
      
      val max = 10000
      
      val stream = Enumerator("[".getBytes) andThen Enumerator.generateM {
        Future {
          i += 1
          if (i < max) {
            val json = Json.stringify(Json.obj(
              "prop" -> Random.nextBoolean(),
              "prop2" -> Random.nextBoolean(),
              "prop3" -> Random.nextInt(),
              "prop4" -> Random.alphanumeric.take(5).mkString("")
            ))
      
            val string = if (isFirstWas) {
              "," + json
            } else {
              isFirstWas = true
              json
            }
      
      
            Some(Codec.utf_8.encode(string))
          } else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag
          else None
      
        }
      }
      

      好的,这个值包含 10000 个(或更多)对象的 jsArray。让我们定义将包含我们数组中每个对象的数据的案例类。

      case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String)
      

      现在编写解析器,它将解析每个项目

      import play.extras.iteratees._    
      import JsonBodyParser._
      import JsonIteratees._
      import JsonEnumeratees._
      
      val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json =>
        for {
          prop <- json.\("prop").asOpt[Boolean]
          prop2 <- json.\("prop2").asOpt[Boolean]
          prop3 <- json.\("prop3").asOpt[Int]
          prop4 <- json.\("prop4").asOpt[String]
        } yield Props(prop, prop2, prop3, prop4)
      }
      

      请参阅doc 了解jsArrayjsValuesjsSimpleObject。构建结果生成器:

      val result = stream &> Encoding.decode() ><> parser
      

      来自 JsonIteratees 包的Encoding.decode() 将字节解码为CharStringresult 值具有 Enumerator[Option[Item]] 类型,您可以将一些迭代器应用于此枚举器以开始解析过程。

      总的来说,我不知道您如何接收字节(解决方案在很大程度上取决于此),但我认为这显示了您的问题的可能解决方案之一。

      【讨论】:

        最近更新 更多