【问题标题】:Pausing a readable stream in Node.js在 Node.js 中暂停可读流
【发布时间】:2017-01-21 18:27:21
【问题描述】:

我正在使用csv-to-json,这是一个处理 CSV 文件的简洁库。

我有一个用例,我需要处理一个大型(>200 万行)CSV 并将其插入数据库。

为了在不遇到内存问题的情况下执行此操作,我打算将 CSV 作为流处理,每 10000 行暂停一次流,将行插入我的数据库中,然后恢复流。

由于某种原因,我似乎无法pause 流。

以下面的代码为例:

const rs = fs.createReadStream("./foo.csv");
rs.pause();

let count = 0;

csv()
.fromStream(rs)
.on("json", (json) => {
  count++;
  console.log(count);
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})

count 被记录了 200 次(这是我的 CSV 中有多少行) - 我希望它不会记录任何内容,因为流在​​传递给 fromStream() 之前已暂停

【问题讨论】:

  • 您在数据库中一次插入一行?为什么不创建一个队列并限制同时执行的请求或使用一些异步方法来防止内存泄漏并避免刷新请求?
  • @AsifSaeed 除了暂停流或有关这是否可行的信息之外,我对其他任何事情都不感兴趣 - 无论如何谢谢

标签: node.js stream


【解决方案1】:

我利用了 csvtojson 也有fromString(...) 方法这一事实,并使用它如下。

  1. 使用 line-by-line 包读取固定数量的行,即 10000 行并将它们存储在数组中。
  2. 使用lr.pause()暂停逐行阅读器。
  3. 在索引 0 处插入标题行(如果您的 csv 文件有标题行,则使用简单的条件语句忽略逐行阅读器返回的第一行)。
  4. 使用 EOL 字符连接所有行,这将为您提供该 CSV 文件的 10000 行的字符串表示形式。
  5. 使用csvtojson的.fromString(...)将block的字符串表示转化为json对象插入到db中。
  6. 通过lr.resume() 恢复流并重复直到逐行阅读器发出'end' 事件。

完整代码

const CSVToJSON = require("csvtojson");
const LineByLineReader = require("line-by-line");
const { EOL } = require("os");

const BLOCK_LIMIT = 10000;

let lines = [];
let isFirstLineProcessed = false;

const lr = new LineByLineReader("./foo.csv");

lr
.on("line", (line) => {

    // remove this if statement if your CSV does not contain headers line
    if (!isFirstLineProcessed) {
        isFirstLineProcessed = true;
        return;
    }

    lines.push(line);

    if (lines.length === BLOCK_LIMIT) {
        lr.pause();

        // insert headers string ("field1, field2, ...") at index 0;
        lines.splice(0, 0, headers);

        // join all lines using newline operator ("\n") to form a valid csv string
        const csvBlockString = lines.join(EOL);
        const entries = [];

        lines = [];      

        csv()
            .fromString(csvBlockString)
            .on("json", (json) => {
                entries.push(json);
            })
            .on("done", () => {
                this._insertEntries(db, entries, ()=> {
                    lr.resume();
               });
            });
    }
})
.on("end", () => {
    console.log("done");
});

【讨论】:

    【解决方案2】:

    这是库创建者建议的解决方案,在此Issue 中进行了跟踪:

    var tmpArr=[];
    rs.pipe(csv({},{objectMode:true})).pipe(new Writable({
      write: function(json, encoding,callback){
        tmpArr.push(json);
        if (tmpArr.length===10000){
          myDb.save(tmpArr,function(){
            tmpArr=[];
            callback();
          })
        }else{
          callback();
        }
      } ,
      objectMode:true
    }))
    .on('finish',function(){
      if (tmpArr.length>0){
        myDb.save(tmpArr,function(){
          tmpArr=[];
        })
      }
    })
    

    我实际上已经设法通过像这样取消管道来模拟暂停,但这并不理想:

    let count = 0;
    var csvParser=csv()
    .fromStream(rs)
    .on("json", (json) => {
      rows.push(json);
      if (rows.length % 1000 === 0) {
        rs.unpipe();
        // clear `rows` right after `unpipe`
        const entries = rows;
        rows = [];
        this._insertEntries(db, entries, ()=> {
          rs.pipe(csvParser);
        });
      }
    })
    

    【讨论】:

    • 使用可写流是一个不错的主意,因为它能够暂停它并在其间执行诸如数据库更新之类的事情。感谢分享!
    • 嗨,我实现了第一段代码。它有效,问题是,对于大文件(> 500,000 行),节点以某种方式启动了一个与当前进程并行的新进程。为了清楚起见,我放了一个count 变量来计算已读取的行数。终端打印count 从 100(我的间隔)开始,然后是 200,然后是 300,等等。在达到 ~ 500,000 之后,还有另一行也从 100 开始并增加。无论如何,第一个 count(现在 > 500,000)一直在增加。
    • @TriNguyen,请务必查看maxRowLength。刚刚跑了 880K 行没有任何问题。
    【解决方案3】:

    除非你修改 csv2json 库,否则你不能这样做。

    这是您应该首先阅读的链接
    https://nodejs.org/dist/latest-v6.x/docs/api/stream.html#stream_three_states

    当您执行 rs.pause() 时,流处于暂停模式。事实上,即使您不这样做,可读流也会以暂停模式启动。

    流在 3 种情况下进入resume

    • 要么有 .on('data') 事件侦听器,要么
    • 有一个 .pipe() 方法附加或
    • readable.resume() 被显式调用。

    在您的情况下,fromStream() 方法将pipe 方法附加到您的可读流,从而恢复了流。

    参考代码:
    https://github.com/Keyang/node-csvtojson/blob/master/libs/core/Converter.js#L378

    Converter.prototype.fromStream=function(readStream,cb){
      if (cb && typeof cb ==="function"){
        this.wrapCallback(cb);
      }
      process.nextTick(function(){
        readStream.pipe(this);
      }.bind(this))
      return this;
    }
    

    【讨论】:

      猜你喜欢
      • 2021-11-07
      • 2021-03-03
      • 1970-01-01
      • 2022-01-09
      • 1970-01-01
      • 2020-10-30
      • 1970-01-01
      • 1970-01-01
      • 2015-10-27
      相关资源
      最近更新 更多