【问题标题】:NodeJS Stream splittingNodeJS 分流
【发布时间】:2023-07-23 19:21:01
【问题描述】:

我有一个来自分叉进程的无限数据流。我希望这个流由一个模块处理,有时我想复制这个流中的数据以由另一个模块处理(例如监视数据流,但如果发生任何有趣的事情,我想记录下一个 n 字节以归档进一步调查)。

那么让我们假设以下场景:

  1. 我启动程序并开始消费可读流
  2. 2 秒后,我想用不同的流读取器处理相同的数据 1 秒
  3. 时间一到,我想关闭第二个消费者,但原始消费者必须保持不变。

这是一个代码sn-p:

var stream = process.stdout;

stream.pipe(detector); // Using the first consumer

function startAnotherConsumer() {
    stream2 = new PassThrough();
    stream.pipe(stream2);

    // use stream2 somewhere else
}

function stopAnotherConsumer() {
    stream.unpipe(stream2);
}

我的问题是,对 stream2 进行解压并不能使其关闭。如果我在unpipe 命令之后调用stream.end(),那么它会因错误而崩溃:

events.js:160
      throw er; // Unhandled 'error' event
      ^

Error: write after end
    at writeAfterEnd (_stream_writable.js:192:12)
    at PassThrough.Writable.write (_stream_writable.js:243:5)
    at Socket.ondata (_stream_readable.js:555:20)
    at emitOne (events.js:101:20)
    at Socket.emit (events.js:188:7)
    at readableAddChunk (_stream_readable.js:176:18)
    at Socket.Readable.push (_stream_readable.js:134:10)
    at Pipe.onread (net.js:548:20)

我什至尝试暂停源流以帮助从第二个流中刷新缓冲区,但它也不起作用:

function stopAnotherConsumer() {
    stream.pause();
    stream2.once('unpipe', function () {
        stream.resume();
        stream2.end();
    });
    stream.unpipe(stream2);
}

这里和之前一样的错误(写在结束之后)。

如何解决问题?我最初的意图是从一个点复制流数据,然后在一段时间后关闭第二个流。

注意:我尝试使用this answer 使其工作。

【问题讨论】:

    标签: node.js stream pipe streamwriter node.js-stream


    【解决方案1】:

    由于没有答案,我发布了我的(拼凑)解决方案。万一有人有更好的,不要犹豫。

    一个新的流:

    const Writable = require('stream').Writable;
    const Transform = require('stream').Transform;
    
    class DuplicatorStream extends Transform {
        constructor(options) {
            super(options);
    
            this.otherStream = null;
        }
    
        attachStream(stream) {
            if (!stream instanceof Writable) {
                throw new Error('DuplicatorStream argument is not a writeable stream!');
            }
    
            if (this.otherStream) {
                throw new Error('A stream is already attached!');
            }
    
            this.otherStream = stream;
            this.emit('attach', stream);
        }
    
        detachStream() {
            if (!this.otherStream) {
                throw new Error('No stream to detach!');
            }
    
            let stream = this.otherStream;
            this.otherStream = null;
            this.emit('detach', stream);
        }
    
        _transform(chunk, encoding, callback) {
            if (this.otherStream) {
                this.otherStream.write(chunk);
            }
    
            callback(null, chunk);
        }
    }
    
    module.exports = DuplicatorStream;
    

    及用法:

    var stream = process.stdout;
    var stream2;
    
    duplicatorStream = new DuplicatorStream();
    stream.pipe(duplicatorStream);   // Inserting my duplicator stream in the chain
    duplicatorStream.pipe(detector); // Using the first consumer
    
    function startAnotherConsumer() {
        stream2 = new stream.PassThrough();
        duplicatorStream.attachStream(stream2);
    
        // use stream2 somewhere else
    }
    
    function stopAnotherConsumer() {
        duplicatorStream.once('detach', function () {
            stream2.end();
        });
        duplicatorStream.detachStream();
    }
    

    【讨论】: