【问题标题】:How to write batches of data in NodeJS stream pipeline?如何在 NodeJS 流管道中写入批量数据?
【发布时间】:2022-11-03 01:08:37
【问题描述】:

我有一个函数,我使用“管道”方法将 CSV 文件作为可读流读取,按行拆分并转换每行的数据,然后将数据添加到数组中。管道完成后,我将所有数据插入数据库。

这是代码的相关部分:

pipeline(storageStream as Readable, split(), this.FilterPipe(), this.MapData(result));

public MapData(result: Array<string>): MapStream {
    return mapSync((filteredData: string) => {
      const trimmed: string = filteredData.trim();
      if (trimmed.length !== 0) {
        result.push(trimmed);
      }
    });
}

由于我们上传了大量非常大的 CSV 文件,我们有时会遇到内存限制,因此我们决定尝试将逻辑拆分为插入批次,这样我们就不会同时使用大量内存。

所以我想分批处理读取的数据,其中每批(假设文件中有100行),我将触发“MapData”函数并将结果数组插入数据库。

是否有任何选项可以添加条件,以便每 X 行触发一次 MapData? 或者,是否有任何其他可能满足要求的解决方案?

提前致谢!

【问题讨论】:

    标签: node.js typescript pipeline event-stream


    【解决方案1】:

    以下代码显示了一个转换流,它缓冲传入的对象(或对象数组)直到它有 100 个对象,然后将它们作为数组向前推:

    var t = new stream.Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        this.buffer = (this.buffer || []).concat(chunk);
        if (this.buffer.length >= 100) {
          this.push(this.buffer);
          this.buffer = [];
        }
        callback();
      },
      flush(callback) {
        if (this.buffer.length > 0) this.push(this.buffer);
        callback();
      }
    }).on("data", console.log);
    for (var i = 0; i < 250; i++) t.write(i);
    t.end();
    

    您可以在pipeline 中包含这样的转换流。

    【讨论】:

    • 嗨海科,谢谢你的回答!我试图实现它,但遇到了“'Transform'类型上不存在属性'缓冲区'”的问题。也许我错过了什么?
    猜你喜欢
    • 1970-01-01
    • 2017-02-09
    • 2021-10-25
    • 1970-01-01
    • 1970-01-01
    • 2017-04-21
    • 1970-01-01
    • 1970-01-01
    • 2015-05-17
    相关资源
    最近更新 更多