【发布时间】:2023-07-23 19:21:01
【问题描述】:
我有一个来自分叉进程的无限数据流。我希望这个流由一个模块处理,有时我想复制这个流中的数据以由另一个模块处理(例如监视数据流,但如果发生任何有趣的事情,我想记录下一个 n 字节以归档进一步调查)。
那么让我们假设以下场景:
- 我启动程序并开始消费可读流
- 2 秒后,我想用不同的流读取器处理相同的数据 1 秒
- 时间一到,我想关闭第二个消费者,但原始消费者必须保持不变。
这是一个代码sn-p:
var stream = process.stdout;
stream.pipe(detector); // Using the first consumer
function startAnotherConsumer() {
stream2 = new PassThrough();
stream.pipe(stream2);
// use stream2 somewhere else
}
function stopAnotherConsumer() {
stream.unpipe(stream2);
}
我的问题是,对 stream2 进行解压并不能使其关闭。如果我在unpipe 命令之后调用stream.end(),那么它会因错误而崩溃:
events.js:160
throw er; // Unhandled 'error' event
^
Error: write after end
at writeAfterEnd (_stream_writable.js:192:12)
at PassThrough.Writable.write (_stream_writable.js:243:5)
at Socket.ondata (_stream_readable.js:555:20)
at emitOne (events.js:101:20)
at Socket.emit (events.js:188:7)
at readableAddChunk (_stream_readable.js:176:18)
at Socket.Readable.push (_stream_readable.js:134:10)
at Pipe.onread (net.js:548:20)
我什至尝试暂停源流以帮助从第二个流中刷新缓冲区,但它也不起作用:
function stopAnotherConsumer() {
stream.pause();
stream2.once('unpipe', function () {
stream.resume();
stream2.end();
});
stream.unpipe(stream2);
}
这里和之前一样的错误(写在结束之后)。
如何解决问题?我最初的意图是从一个点复制流数据,然后在一段时间后关闭第二个流。
注意:我尝试使用this answer 使其工作。
【问题讨论】:
标签: node.js stream pipe streamwriter node.js-stream