【发布时间】:2023-04-04 10:54:02
【问题描述】:
我需要两个不同的侦听器来读取来自可读流的输入。 他们似乎有竞争条件,我想确保我做对了......
当我使用 .on('data') - 每个侦听器都返回相同的块并可以读取它。
当我使用 .on('readable') - 捕获事件的第一个侦听器将从缓冲区读取,而第二个侦听器将获得一个空缓冲区?
意思是当我有两个监听器时我不能使用 .on('readable') 事件?
【问题讨论】:
标签: node.js
我需要两个不同的侦听器来读取来自可读流的输入。 他们似乎有竞争条件,我想确保我做对了......
当我使用 .on('data') - 每个侦听器都返回相同的块并可以读取它。
当我使用 .on('readable') - 捕获事件的第一个侦听器将从缓冲区读取,而第二个侦听器将获得一个空缓冲区?
意思是当我有两个监听器时我不能使用 .on('readable') 事件?
【问题讨论】:
标签: node.js
当发出 可读 事件时,两个侦听器都会进行侦听。但是,第一个调用 read() 函数将获取数据,而第二个调用将获取一个空缓冲区,因为它是相同的输入流被读取两次并且只存在一个块。
当发出 data 事件时,侦听器会收到从流中读取的数据块。所以所有的监听器都会收到相同的数据。
[编辑]详细说明它的工作原理:
所有可读流以 2 种模式开始:流动和暂停。默认情况下,所有可读流都以暂停模式开始,但可以使用以下三种方法中的任何一种将它们切换到流动模式:
1. 将“数据”事件处理程序附加到流
2.调用stream.resume()方法
3.调用stream.pipe()方法将数据发送到Writable。
当您使用上述任何一种方法时,流就会开始流动。它不关心 data 侦听器是否附加到流并且有可能丢失数据。在内部,在流上调用 read() 方法,并且读取内部缓冲区中累积的任何数据并将其发送给侦听器。内存使用率很低。
当您在流上附加 可读 侦听器时,它会优先于 data 侦听器,因此您的流仍处于 暂停 模式.在暂停模式下,您必须通过调用 read() 方法从内部缓冲区中显式读取数据。当可读数据可用时,它会继续在内部缓冲区中累积,直到显式调用 read() 方法或恢复流。您可以指定要从内部缓冲区读取的块的大小(以字节为单位)或返回所有可用数据。当 read() 被调用时,data 事件也会与读取的数据块一起发出。消费完这些数据后,内部缓冲区被清空。因此,当您附加了多个 可读 事件并尝试从同一个内部缓冲区消费时,您将无法多次获取相同的数据。
我对您的建议是只有一个 可读 侦听器和多个 data 侦听器。拥有 可读 将使您可以灵活地在需要时阅读而不会丢失任何数据。并且使用 data 事件处理程序,您将能够在所有处理程序中获取该数据。
【讨论】:
TL;DR:data 事件会将数据发送给任意数量的侦听器,但前提是侦听器会在读取数据之前附加,因此请使用pipe 方法将数据复制到两个PassThrough 流并分别从每个流中读取。
详细说明如下:
首先我们需要了解:单个流对于每个数据块只能产生一次data 事件。这是因为流对象不“知道”谁在读取 - 响应被读取,仅此而已。
现在,只要阅读您的问题,我就会回答 - 怎么会?该事件将运行两次,因为每个数据事件侦听器都会被调用...除非您在readable 事件中实际设置了侦听器...
假设您设置了这样的东西:
response.on("data", function A(chunk) {...});
response.on("readable", function B() {
response.on("data", function C(chunk) {...});
});
现在当一个块1 来到A() 将收到块:1。如果之后流drains(意味着响应太慢以至于无法提供任何新数据)它将暂停。只有在那之后,流可能再次变为readable,然后B() 函数将为C() 设置一个侦听器... 但是... 函数A() 已经在读取chunk当我们这样做时,C() 将只在下一个流上工作。
现在正如 Swati Anand 所提到的,您可以使用相同的事件侦听器 - 但在我看来,这会扼杀流的全部美感。使用pipe 会更优雅,如下所示:
const PassThrough = require('stream').PassThrough;
const data1 = new PassThrough();
const data2 = new PassThrough();
response.pipe(data1).on("data", function A(){...});
response.pipe(data2);
// when you're ready
data2.on("data", function C(){...});
这背后的想法是——我们不监听原始流上的任何事件,而是创建两个类似于“克隆”的传递事件并在那里读取data。这样程序的每个部分都不会丢失任何数据。
node.js stream docs 实际上是关于数据流的最佳读物。试着把它读成一个管道故事——连接管道等。
【讨论】:
// when you're ready 部分后才会读取响应 - 否则响应仍会暂停。
您可以使用 RxJS 使其更容易,如下所示
const { Observable, fromStream } = require('rxjs');
const { share } = require('rxjs/operators');
// By declaring this observable, the same stream will be use for every subscribers
const stream$ = Observable.fromStream(yourStream).share();
const subscriber1 = stream$.subscribe(console.log);
const subscriber2 = stream$.subscribe(console.log);
【讨论】:
tl;dr 使用 PassThrough 流
const stream = require('stream');
const pt = new stream.PassThrough();
response.pipe(pt);
pt.on("data", data => {
// observe the data
);
【讨论】:
所以,没有一个答案对我很有效。不过这样做了:
let b = 0
const passReceiver = new PassThrough()
const passWatcher = new PassThrough()
passWatcher.on('data', chunk => {
console.log('data: ', b++) // or any other "watching" you may desire
passReceiver.write(chunk)
})
passWatcher.on('end', () => {
console.log('passWatcher end')
passReceiver.end()
})
// that's the setup, then you pipe into passWatcher and out of passReceiver
SomeReadableStream.pipe(passWatcher)
passReceiver.pipe(someWritableStream)
【讨论】: