【问题标题】:Extract binary values from stream with low memory consumption从低内存消耗的流中提取二进制值
【发布时间】:2026-01-11 20:25:02
【问题描述】:

我正在使用 ExpressJS 构建一个 NodeJS 服务器,用于处理从桌面应用通过 POST 请求 发送的数据(50KB>100MB)待处理并退回。桌面应用 gzip 在发送前压缩数据(50KB 变为 4KB)。

我希望服务器解压缩数据,从数据中提取值(字符串、整数、字符、数组、json 等),处理该数据,然后用处理后的数据进行响应。

我是从这个开始的:

apiRoute.route("/convert").post(bodyParser.raw({limit: '100Mb'}), (req, res) =>{
    let outputData;
    //extract values from req.body Buffer and do math on them.
    //save processed data in outputData

    res.json({
        status: true,
        data: outputData
    });
});

之所以有效,是因为 body-parser 将数据解压缩到存储在内存中的 Buffer req.body 中。这是我的主要问题......内存使用情况。我不想将整个数据集存储在内存中。


为了解决这个问题,我删除了 body-parser,而是将请求流直接通过管道传输到 zlib 转换流中:

apiRoute.route("/convert").post((req, res) =>{
    req.pipe(zlib.createGunzip());
});

现在的问题是我不知道如何从流中提取二进制值。


这是我希望能够做到的:

apiRoute.route("/convert").post((req, res) =>{
    let binaryStream = new stream.Transform();

    req
    .pipe(zlib.createGunzip())
    .pipe(binaryStream);

    let aValue = binaryStream.getBytes(20);//returns 20 bytes
    let bValue = binaryStream.getBytes(20000);//returns the next 20000 bytes
    //etc...

});

但是我不知道有什么方法可以做到这一点。 Dissolve 之类的模块很接近,但是它们需要您提前设置解析逻辑,并且所有抓取的值都存储在内存中。

另外,如果不将 outputData 全部加载到内存中,我不知道如何响应。


所以我的问题是,我该如何...

  • 以我自己的异步速率从流中读取数据并提取其中的值
  • 将处理后的数据发送回桌面应用程序,而不将其全部放入内存中

【问题讨论】:

    标签: node.js express asynchronous stream gzip


    【解决方案1】:

    我解决了自己的问题。我不是 100% 确信这是实现此目标的最佳方式,因此我愿意接受建议。

    我创建了stream.Transform 的子类并实现了_transform 方法。我发现只有在调用 _transform 回调时才会输入下一个数据块。知道了这一点,我将该回调函数存储为一个属性,并且仅在需要下一个块时调用它。

    getBytes(size) 是一种从当前块中获取指定字节数(也保存为属性)并在需要下一个块时调用先前保存的回调的方法。这是递归完成的,以解决不同大小的块和不同数量的请求字节。

    然后结合使用 async/await 和 Promise,我能够使整个过程保持异步 (afaik) 和背压。

    const {Transform} = require('stream'),
    events = require('events');
    
    class ByteStream extends Transform{
    
        constructor(options){
            super(options);
    
            this.event_emitter = new events.EventEmitter();
            this.hasStarted = false;
            this.hasEnded = false;
            this.currentChunk;
            this.nextCallback;
            this.pos = 0;
    
            this.on('finish', ()=>{
                this.hasEnded = true;
                this.event_emitter.emit('chunkGrabbed');
            });
        }
    
        _transform(chunk, enc, callback){
            this.pos = 0;
            this.currentChunk = chunk;
            this.nextCallback = callback;
    
            if(!this.hasStarted){
                this.hasStarted = true;
                this.event_emitter.emit('started');
            }
            else{
                this.event_emitter.emit('chunkGrabbed');
            }
        }
    
        doNextCallback(){
            return new Promise((resolve, reject) =>{
                this.event_emitter.once('chunkGrabbed', ()=>{resolve();});
                this.nextCallback();
            });
        }
    
        async getBytes(size){
            if(this.pos + size > this.currentChunk.length)
            {
                let bytes = this.currentChunk.slice(this.pos, this.currentChunk.length);
    
                if(!this.hasEnded)
                {
                    var newSize = size-(this.currentChunk.length - this.pos);
                    //grab next chunk
                    await this.doNextCallback();
                    if(!this.hasEnded){
                        this.pos = 0;
                        let recurseBytes; await this.getBytes(newSize).then(bytes => {recurseBytes = bytes;});
                        bytes = Buffer.concat([bytes, recurseBytes]);
                    }
                }
    
                return bytes;
            }
            else{
                let bytes = this.currentChunk.slice(this.pos, this.pos+size);
                this.pos += size;
                return bytes;
            }
        }
    }
    
    module.exports = {
        ByteStream : ByteStream 
    }
    

    我现在的快速路线是:

    apiRoute.route("/convert").post((req, res)=>{
    
        let bStream = new ByteStream({});
        let gStream = zlib.createGunzip();
    
        bStream event_emitter.on('started', async () => {
            console.log("started!");
    
            let myValue; await bStream.getBytes(60000).then(bytes => {myValue = bytes});
            console.log(myValue.length);
        });
    
        req
        .pipe(gStream)
        .pipe(bStream);
    });
    

    通过检查started 事件,我可以知道第一个块何时流入bStream。从那里开始,只需使用我想要的字节数调用getBytes(),然后将承诺的值分配给一个变量。它可以满足我的需要,虽然我还没有进行任何严格的测试。

    【讨论】: