【问题标题】:Implementing a buffered transform stream实现缓冲的转换流
【发布时间】:2013-12-17 12:56:19
【问题描述】:

我正在尝试使用new Node.js streams API 实现一个流,它将缓冲一定数量的数据。当这个流通过管道传输到另一个流时,或者如果某些东西消耗了readable 事件,这个流应该刷新它的缓冲区,然后简单地成为传递。关键是,该流将通过管道传输到许多其他流,并且当附加每个目标流时,必须刷新缓冲区即使它已经被刷新到另一个流

例如:

  1. BufferStream 实现了stream.Transform,并保留了一个 512KB 的内部环形缓冲区
  2. ReadableStreamA 通过管道传送到 BufferStream 的实例
  3. BufferStream 写入其环形缓冲区,并从 ReadableStreamA 读取数据。(数据是否丢失无关紧要,因为缓冲区会覆盖旧数据。)
  4. BufferStream 通过管道传送到 WritableStreamB
  5. WritableStreamB 接收整个 512KB 缓冲区,并继续获取从 ReadableStreamABufferStream 写入的数据。
  6. BufferStream 通过管道传送到 WritableStreamC
  7. WritableStreamC 也接收整个 512KB 缓冲区,但这个缓冲区现在与 WritableStreamB 接收的不同,因为更多数据已写入 BufferStream

这可以通过流 API 实现吗?我能想到的唯一方法是使用一种方法创建一个对象,该方法为每个目的地启动一个新的 PassThrough 流,这意味着我不能简单地通过管道进出它。

不管怎样,我已经使用旧的“流动”API 完成了这项工作,只需在 data 事件上侦听新的处理程序。当.on('data') 附加一个新函数时,我会直接使用环形缓冲区的副本调用它。

【问题讨论】:

  • 一个疑问:数据是否仅在 512KB 突发中移动,还是只有第一个为 512KB?
  • @user568109 当某物开始从缓冲流接收数据时,它应该接收初始的 512KB 缓冲区(仅一次),然后在数据可用时继续通过缓冲流接收数据。只有第一个块是 512KB(或者缓冲区的大小)。

标签: javascript node.js stream


【解决方案1】:

这是我对你的问题的看法。

基本思想是创建一个Transform 流,这将允许我们在将数据发送到流的输出之前执行您的自定义缓冲逻辑:

var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // custom buffering logic
  // ie. add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

然后,我们需要重写.pipe() 方法,以便在BufferStream 被传送到流中时收到通知,这允许我们自动向其中写入数据:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

这样,当我们写buffer.pipe(someStream)时,我们按预期执行管道并将内部缓冲区写入输出流。之后,Transform 类负责处理所有事情,同时跟踪背压等。

这是working gist。请注意,我没有费心编写正确的缓冲逻辑(即,我不关心内部缓冲区的大小),但这应该很容易解决。

【讨论】:

  • 我认为这很接近但不是 100% 正确。在第一次调用管道之前,转换实现需要将所有内容放到缓冲区中,然后一旦调用管道,然后切换到调用 this.push。
  • 还有_flush()方法呢
【解决方案2】:

Paul 的回答很好,但我认为它不符合确切的要求。听起来需要发生的是每次在此转换流上调用 pipe() 时,它需要首先刷新表示在转换流创建/(连接到源流)和它连接到当前可写/目标流的时间。

这样的说法可能更正确:

  var BufferStream = function () {
        stream.Transform.apply(this, arguments);
        this.buffer = []; //I guess an array will do
    };

    util.inherits(BufferStream, stream.Transform);

    BufferStream.prototype._transform = function (chunk, encoding, done) {

        this.push(chunk ? String(chunk) : null);
        this.buffer.push(chunk ? String(chunk) : null);

        done()
    };

    BufferStream.prototype.pipe = function (destination, options) {
        var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
        this.buffer.forEach(function (b) {
            res.write(String(b));
        });
        return res;
    };


    return new BufferStream();

我猜是这样的:

BufferStream.super_.prototype.pipe.apply(this, arguments);

等价于:

stream.Transform.prototype.pipe.apply(this, arguments);

您可能会对此进行优化,并在调用管道/取消管道时使用一些标志。

【讨论】:

  • 这个问题当然是它正在缓冲所有数据,而且它永远不会停止缓冲,所以这很容易导致内存“泄漏”,除非你小心。可能适用于短期程序,但不适用于服务器等。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-01-03
  • 1970-01-01
  • 1970-01-01
  • 2018-04-15
  • 2018-08-02
相关资源
最近更新 更多