【问题标题】:Content GZIP decompression for Apache Http Async ClientApache Http 异步客户端的内容 GZIP 解压缩
【发布时间】:2020-07-13 18:01:10
【问题描述】:

在经典的HttpClient 中,GZIP 解压缩由ContentCompressionExec 开箱即用地处理。这是如何使用HttpAsyncClient 完成的?我找不到任何实现此功能的AsyncExecChainHandler

【问题讨论】:

    标签: apache-httpcomponents apache-httpasyncclient apache-httpclient-5.x


    【解决方案1】:

    我最终在 scala 中实现了以下AsyncResponseConsumer

    class SimpleDecompressingResponseConsumer(val entityConsumer: AsyncEntityConsumer[Array[Byte]])
      extends AbstractAsyncResponseConsumer[SimpleHttpResponse, Array[Byte]](entityConsumer) {
      override def informationResponse(response: HttpResponse, context: HttpContext): Unit = ()
    
      override protected def buildResult(response: HttpResponse, entity: Array[Byte], contentType: ContentType): SimpleHttpResponse = {
        val simpleResponse = SimpleHttpResponse.copy(response)
        if (entity != null) simpleResponse.setBody(entity, contentType)
        simpleResponse
      }
    }
    
    class SimpleAsyncDecompressingEntityConsumer extends AbstractBinDataConsumer with AsyncEntityConsumer[Array[Byte]] {
      @volatile
      private var resultCallback: FutureCallback[Array[Byte]] = _
      private var encoding: Array[Byte] => Array[Byte] = _
      private var content: Array[Byte] = _
    
      private val buffer = new ByteArrayBuffer(1024)
    
      override def streamStart(entityDetails: EntityDetails, resultCallback: FutureCallback[Array[Byte]]): Unit = {
        this.resultCallback = resultCallback
        this.encoding = entityDetails.getContentEncoding match {
          case "gzip" | "x-gzip" =>
            bytes => IOUtils.toByteArray(new GZIPInputStream(new ByteArrayInputStream(bytes)))
          case "deflate" =>
            bytes => IOUtils.toByteArray(new DeflateInputStream(new ByteArrayInputStream(bytes)))
          case _ =>
            identity
        }
      }
    
      override def failed(cause: Exception): Unit = {
        if (resultCallback != null) resultCallback.failed(cause)
        releaseResources()
      }
    
      override def getContent: Array[Byte] = content
    
      override def capacityIncrement(): Int = Int.MaxValue
    
      override def data(src: ByteBuffer, endOfStream: Boolean): Unit = {
        if (src == null) return
        if (src.hasArray) buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining())
        else while (src.hasRemaining) buffer.append(src.get)
      }
    
      override def completed(): Unit = {
        this.content = encoding(buffer.toByteArray)
        if (resultCallback != null) resultCallback.completed(content)
        releaseResources()
      }
    
      override def releaseResources(): Unit = buffer.clear()
    }
    

    【讨论】:

    • 对于小型消息有效负载看起来很合理。
    • @ok2c 是的,确实可以。我使用异步客户端是因为我需要经典客户端不需要的 HTTP2 支持。