【问题标题】:Pausing readline in Node.js在 Node.js 中暂停 readline
【发布时间】:2014-01-24 19:37:45
【问题描述】:

考虑下面的代码......我试图在阅读前 5 行后暂停流:

var fs          = require('fs');
var readline    = require('readline');
var stream      = require('stream');
var numlines    = 0;
var instream    = fs.createReadStream("myfile.json");
var outstream   = new stream;
var readStream = readline.createInterface(instream, outstream);
readStream.on('line', function(line){
  numlines++;
  console.log("Read " + numlines + " lines");
  if (numlines >= 5) {
    console.log("Pausing stream");
    readStream.pause();
  }
});

输出(下一个复制)表明它在暂停后继续读取行。也许 readline 已经在缓冲区中排队了几行,并且无论如何都将它们提供给我......如果它继续在后台异步读取,这将是有意义的,但根据文档,我不知道是什么应有的行为。关于如何达到预期效果有什么建议吗?

Read 1 lines
Read 2 lines
Read 3 lines
Read 4 lines
Read 5 lines
Pausing stream
Read 6 lines
Pausing stream
Read 7 lines

【问题讨论】:

    标签: node.js


    【解决方案1】:

    因此,即使在 pause() 之后,readline 流也倾向于“滴落”(即泄漏一些额外的行)。文档没有说明这一点,但确实如此。

    如果您希望 pause() 切换立即出现,您必须创建自己的行缓冲区并自己累积剩余的行。

    【讨论】:

      【解决方案2】:

      有点不直观,the pause methods does not stop queued up line events

      调用rl.pause() 不会立即暂停readline.Interface 实例发出的其他事件(包括'line')。

      但是,有一个名为 line-by-line 的第 3 方模块,其中 pause 暂停 line 事件,直到它恢复为止。

      var LineByLineReader = require('line-by-line'),
          lr = new LineByLineReader('big_file.txt');
      
      lr.on('error', function (err) {
        // 'err' contains error object
      });
      
      lr.on('line', function (line) {
        // pause emitting of lines...
        lr.pause();
      
        // ...do your asynchronous line processing..
        setTimeout(function () {
      
            // ...and continue emitting lines.
            lr.resume();
        }, 100);
      });
      
      lr.on('end', function () {
        // All lines are read, file is closed now.
      });
      

      (我与该模块无关,只是发现它对处理这个问题很有用。)

      【讨论】:

      • 感谢您的回答。出于兴趣,这样的要求有多普遍?我正在解析需要流式传输到服务器的 80GB CSV。还有哪些其他用例?
      • @ZachSmith 我发现当回调不能或不应该同步完成时(例如,将行插入数据库),能够随意暂停和恢复非常有用。如果读取行的速度比处理它们的速度快,则可能会排队太多请求并耗尽内存。
      • 我的用例是读取一个大型 (
      【解决方案3】:

      补充几点:

      .on('pause', function() {
          console.log(numlines)
      })
      

      你会得到5。它在node.js document中提到:

      • 输入流未暂停并且接收 SIGCONT 事件。 (参见事件 SIGTSTP 和 SIGCONT)

      所以,我在线路事件中创建了一个 tmp 缓冲区。使用标志来判断是否触发暂停。

      .on('line', function(line) {
         if (paused) {
            putLineInBulkTmp(line);
         } else {
            putLineInBulk(line);
         }
      }
      

      然后在 on pause 和 resume:

      .on('pause', function() {
          paused = true;
          doSomething(bulk, function(resp) {
              // clean up bulk for the next.
              bulk = [];
              // clone tmp buffer.
              bulk = clone(bulktmp);
              bulktmp = [];
              lr.resume();
          });
      })
      .on('resume', () => {
        paused = false;
      })
      

      用这种方式来处理这种情况。

      【讨论】:

      • 我认为在bulk = clone(bulktmp);lr.resume(); 之间可能会丢失一些数据
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-03-30
      • 1970-01-01
      • 1970-01-01
      • 2022-11-18
      • 1970-01-01
      相关资源
      最近更新 更多