【问题标题】:Read lines reactively with Spring WebClient使用 Spring WebClient 响应式读取行
【发布时间】:2019-07-23 10:29:01
【问题描述】:

TLDR:如何使用 Spring WebClient(反应式)逐行处理 GET 响应?

详情:

  • 远程服务器返回最大 20 Gb 的响应
  • 我的服务单独解析行(每行编码为 UTF8)并流式传输结果(跳过 99% 的行)
  • 我不想将整个响应加载到内存中,例如我想逐行解析服务器更新。

不幸的是,我没有找到任何将Flux<ByteBuffer> 转换为Flux<String> 的解决方案(通过在行尾拆分)。

问题:是否有任何嵌入式转换器/解码器可以做到这一点?

可能的解决方案:

  • 创建临时缓冲区(最初为空)
  • 对于每个输入缓冲区:
    • 将临时缓冲区添加到新的、重新创建的临时缓冲区中。
    • 尝试从此缓冲区读取单行(例如,读取到行尾):
    • 如果有剩余字节 - 返回此字符串并重复行读取
    • 如果缓冲区已完成(例如,那里没有行分隔符):只需将这些字节复制到临时缓冲区。
  • 在最后一个缓冲区之后:读取临时缓冲区直到结束。

另外:您不能只将输入缓冲区转换为字符串,因为某些 utf8 字符可以从缓冲区 N 开始并在缓冲区 N+1 处继续。

【问题讨论】:

    标签: java spring http streaming webclient


    【解决方案1】:

    下面的代码可以工作,但是这是完全同步的代码(可能只有预取能力)。它使用Apache Http Components

    HttpClientBuilder.create().build().use { client ->
        val responseHandler = ResponseHandler { response ->
            response.entity.content.use { content ->
                content.bufferedReader().use { buffered ->
                    // create class, which can process each line. 
                    val processor = StreamedLinesProcessor<TResult>()
    
                    do {
                        val nextLine = buffered.readLine()
                        val needContinue = processor.processNextLine(nextLine)
                    } while (needContinue)
                    processor.getResult()
                }
            }
        }
    
        client.execute(HttpGet(url.toString()), responseHandler)
    }
    

    【讨论】:

      猜你喜欢
      • 2021-09-17
      • 2021-01-08
      • 2017-10-25
      • 2020-11-24
      • 2021-08-24
      • 2020-05-20
      • 2020-10-30
      • 2021-11-15
      • 2019-09-15
      相关资源
      最近更新 更多