【问题标题】:Reactor - How to compress Flux<ByteBuffer> on the fly?Reactor - 如何即时压缩 Flux<ByteBuffer>?
【发布时间】:2021-04-29 12:21:27
【问题描述】:

我需要在没有中间存储的情况下读写压缩 (gzip/brotli) 流。数据以Flux&lt;ByteBuffer&gt; 格式从底层证券接收。数据足够大,无法选择缓冲。如何即时压缩 Flux&lt;ByteBuffer&gt; 而无需将完整数据存储在内存中或写入磁盘?

【问题讨论】:

  • 您应该可以使用 JDK 中的 Deflator 类 - 从 11 开始就可以直接使用 ByteBuffer

标签: java spring-webflux project-reactor


【解决方案1】:

您希望避免缓冲完整数据,但您可以存档每个 ByteBuffer 块,或者,如果您的块足够小,则可以将块合并成组,然后进行存档。

这不需要太长的内存,但会压缩您的数据。 实际的压缩级别取决于源数据的内容以及归档前合并的块数。我认为,您可以手动调整它以获得最佳比例。

可能的代码示例如下:

public class Test_GzipFlux {

/**
 * Returns Flux of gzip-ed buffers after (optional) buffer consolidation
 * @param inFlux input stream of buffers
 * @param consolidatedBufCount number of buffers to consolidate before gzip-ing
 */
public static Flux<ByteBuffer> gzipFlux(Flux<ByteBuffer> inFlux, 
                                        int consolidatedBufCount, int outChunkMaxLength) {
    return inFlux.buffer(consolidatedBufCount)
                 .map(inList->zipBuffers(inList, outChunkMaxLength));
}

/**
 * Consolidates buffers from input list, applies gzip, returns result as single buffer
 * @param inList portion of chunks to be consolidated
 * @param outChunkMaxLength estimated length of output chunk. 
 *        !!! to avoid pipe deadlock, this length to be sufficient
 *        !!! for consolidated data after gzip 
 */
private static ByteBuffer zipBuffers(List<ByteBuffer> inList, int outChunkMaxLength) {
    try {
        PipedInputStream pis = new PipedInputStream(outChunkMaxLength);
        GZIPOutputStream gos = new GZIPOutputStream(new PipedOutputStream(pis));

        for (var buf: inList) {
            gos.write(buf.array());
        }
        gos.close();
        byte[] outBytes = new byte[pis.available()];
        pis.read(outBytes);
        pis.close();
        return ByteBuffer.wrap(outBytes);
    } catch (IOException e) {
        throw new RuntimeException(e.getMessage(), e);
    }       
}

private static void test() {
    int inLength  = ... // actual full length of source data
    Flux<ByteBuffer> source = ... // your source Flux
    
    // these are parameters for your adjustment
    int consolidationCount = 5;
    int outChunkMaxLength= 30 * 1024;

    Flux<ByteBuffer> result = gzipFlux(source,consolidationCount, outChunkMaxLength);           

    int outLen = result.reduce(0, (res, bb) -> res + bb.array().length).block();
    System.out.println("ratio=" + (double)inLength/outLen);
}
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2010-11-27
    • 2019-06-23
    • 2021-12-02
    • 2019-01-07
    • 2021-11-23
    • 2021-02-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多