【问题标题】:Reading json stream without blocking读取json流而不阻塞
【发布时间】:2021-05-12 22:44:03
【问题描述】:

我希望能够使用 Jackson (2) 读取 json 消息流(从套接字)。

有一些方法可以将Reader作为源传递,例如:

ObjectMapper mapper = new ObjectMapper();
MyObject obj = mapper.readValue(aReader, MyObject.class);

但这会一直阻塞,直到整个 json 消息到达,我想避免这种情况。

有没有办法拥有一个缓冲区,我可以继续向其中添加字节,并能够询问缓冲区是否包含特定类的完整 json 表示?
比如:

JsonBuffer buffer = new JsonBuffer(MyObject.class);
...
buffer.add(readBytes);
if (buffer.hasObject()) {
    MyObject obj = buffer.readObject();
}

谢谢。

【问题讨论】:

    标签: java jackson nonblocking


    【解决方案1】:

    Jackson 支持非阻塞 JSON 流解析 as of 2.9。您可以在 Spring Framework 5 Jackson2Tokenizer 中找到有关如何使用它的示例。

    【讨论】:

      【解决方案2】:

      (我知道这个帖子很旧,但由于没有公认的答案,我想添加我的,以防万一有人还在读这个)。

      我刚刚发布了一个名为 Actson (https://github.com/michel-kraemer/actson) 的新库。它的工作原理几乎与 OP 建议的一样。您可以向它提供字节,直到它返回一个或多个 JSON 事件。当它消耗完所有输入数据后,您向它提供更多字节并获取下一个 JSON 事件。这个过程一直持续到 JSON 文本被完全使用为止。

      如果您了解 Aalto XML (https://github.com/FasterXML/aalto-xml),那么您应该能够快速熟悉 Actson,因为界面几乎相同。

      这是一个简单的例子:

      // JSON text to parse
      byte[] json = "{\"name\":\"Elvis\"}".getBytes(StandardCharsets.UTF_8);
      
      JsonParser parser = new JsonParser(StandardCharsets.UTF_8);
      
      int pos = 0; // position in the input JSON text
      int event; // event returned by the parser
      do {
          // feed the parser until it returns a new event
          while ((event = parser.nextEvent()) == JsonEvent.NEED_MORE_INPUT) {
              // provide the parser with more input
              pos += parser.getFeeder().feed(json, pos, json.length - pos);
      
              // indicate end of input to the parser
              if (pos == json.length) {
                  parser.getFeeder().done();
              }
          }
      
          // handle event
          System.out.println("JSON event: " + event);
          if (event == JsonEvent.ERROR) {
              throw new IllegalStateException("Syntax error in JSON text");
          }
      } while (event != JsonEvent.EOF);
      

      【讨论】:

        【解决方案3】:

        您可以使用 JsonParser 获取单个事件/令牌(这是 ObjectMapper 在内部使用的),这允许更精细的访问。但是目前所有的功能都使用阻塞 IO,所以没有办法进行所谓的非阻塞(又名“异步”)解析。

        编辑:2019-09-18 -- 更正:Jackson 2.9 (https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.9) 添加了对非阻塞/异步 JSON 解析的支持(问题 https://github.com/FasterXML/jackson-core/issues/57

        【讨论】:

        • 谢谢,但我确实在寻找异步的做事方式。我设法找到了解决方法并添加了自己的答案。
        • 是的,正要提到带有长度前缀的外部框架可能是要走的路……但我想你可能已经知道这种可能性了。
        • 我一开始是直接通过jackson做的,但是当看到它只实现阻塞IO时,我开始考虑其他选择。
        • 阻塞 IO 是要走的路——我所知道的 Java JSON 解析器都不支持推送式解析。很少有其他人(对于 XML,Aalto 是这样)。非阻塞是一个主要的 PITA,我在写了几件事后说这个。 NIO 使用起来很痛苦,但除此之外,当结合压缩、身份验证等内容时,事情会变得非常复杂。
        • 我发现 Netty 让 NIO 的痛苦减轻了很多,但我同意,这并不总是正确的方法,并且通过身份验证我使用阻塞 IO,但对于通常的流我需要实现 NIO只是表现得更好。
        【解决方案4】:

        这不是我问题的答案,而是我想出的解决方法。

        我没有在 Jackson 方面处理非阻塞 IO,而是在我的协议中实现了它。
        所有 json 消息在发送时都用一个 4 字节的 int 填充,它保存消息其余部分的长度。
        现在读取 json 消息变得很容易,我只需找出长度,异步读取它,然后可以将 Jackson 与结果字符串一起使用。

        如果有人知道如何直接从杰克逊那里做到这一点,我仍然很想知道。

        【讨论】:

          【解决方案5】:

          我为这个问题找到了一些可行的解决方案。您可以使用方法inputStream.available 来检查流中是否有一些字节,并且此方法也是非阻塞的。因此,您可以使用此方法检查是否存在某些内容 - 解析值,如果没有 - 等待一段时间再检查。下面显示了两个示例。

          安全风格 - 检查 START_OBJECT json 令牌:

          while (run.get()) { 
              if (inputStream.available() > 0) {
                  for (JsonToken jsonToken; (null != (jsonToken = jsonParser.nextToken())); ) {
                      if (JsonToken.START_OBJECT.equals(jsonToken)) {
                          outputStream.onNext(jsonParser.readValueAs(tClass));
                          break;
                      }
                  }
              } else {
                  Thread.sleep(200); // Or can be another checking time.
              }
          }
          

          或最简单的风格:

          while (run.get()) {
              if (inputStream.available() > 0) {
                  outputStream.onNext(jsonParser.readValueAs(tClass));
              } else {
                  Thread.sleep(200); // Or can be another checking time.
              }
          }
          

          【讨论】:

            【解决方案6】:

            Gson怎么用?

            private fun handleActions(webSocketMessage: WebSocketMessage, webSocketSession: WebSocketSession): Mono<WebSocketMessage> {
                val action = Gson().fromJson(webSocketMessage.payloadAsText, ActionWs::class.java)
                return when (action.action) {
                    "REGISTER" -> companyService.createCompany(action.company)
                        .map { webSocketSession.textMessage(jacksonObjectMapper().writeValueAsString(it)) }
                    else -> Mono.just(webSocketSession.textMessage(Gson().toJson(CompanyInfo(0, 0, 0, "ACAO INVALIDA!"))))
                }
            }
            

            【讨论】:

              猜你喜欢
              • 2012-08-16
              • 2017-04-07
              • 2022-01-02
              • 2011-06-25
              • 1970-01-01
              • 2020-09-18
              • 2018-07-26
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多