【问题标题】:Node.js stream writing to MongoDB - concerned about performanceNode.js 流写入 MongoDB - 关注性能
【发布时间】:2017-04-08 17:32:48
【问题描述】:

我需要读取包含数千行的日志文件并将每一行写入 Mongo 数据库。我正在使用节点流读取文件。我正在使用“split”npm 包将文件拆分为“行”。由于网络考虑,MongoDB 写入将比日志文件读取花费更长的时间。

我的核心代码如下所示:

var readableStream = fs.createReadStream(filename);

            readableStream
                .pipe(split()) // This splits the data into 'lines'
                .on('data', function (chunk) {

                    chunkCount++;
                    slowAsyncFunctionToWriteLogEntryToDatabase(chunk); // This will take ages

                })
                .on('end', function () {
                    // resolve the promise which bounds this process
                    defer.resolve({v:3,chunkCount: chunkCount})

                });

我是否需要担心 MongoDB 系统会受到排队写入次数的影响?大概节点管道背压机制不会知道大量的数据库写入正在排队?有什么方法可以“减慢”可读流,以便它在从日志文件中读取下一行之前等待每个 MongoDB 插入完成?我是否有不必要的担心?

【问题讨论】:

  • 我猜你可以阅读整个日志文件,然后使用 db.collection.insertMany() 将所有文档插入单个 db 调用中。它会更快

标签: node.js mongodb node-streams


【解决方案1】:

由于使用pause()resume() 似乎有一些问题。我将编写另一个选项,即使用转换流。

var Transform = require('stream').Transform;

var myTransform = new Transform({
   transform(chunk, encoding, cb) {
      chunkCount++;

      syncFunctionToWriteLogEntryWithCallback( chunk, function() {
         cb();
      } );
  },

  flush(cb) {
      chunkCount++;
      syncFunctionToWriteLogEntryWithCallback( chunk, function() {
         cb();
      } );
  }
});

readableStream
        .pipe( split() )
        .pipe( myTransform );

使用转换流允许您在处理完流时提供回调。

【讨论】:

  • 太好了,现在的行为完全符合我的要求 - 它允许在流移动到下一行之前完全处理每一行。完美的解决方案,虽然现在的性能很糟糕,大概是因为我将请求推送到 Mongo 的方式!非常感谢您的帮助。
  • 性能改进可能是使用另一个 mongoDB 插入函数。有bulk
  • 我将插入更改为插入文档数组而不是单个文档,这极大地提高了性能。一切准备就绪!再次感谢您的帮助:)
  • @dinchev - 遇到了类似的问题并尝试了批量处理,但我这样做的方法是在 dataexecute 上调用 end 上的插入,这实际上使流式传输无用并且代码在火焰中死去堆内存:) 这个答案stackoverflow.com/a/33360069/856498 但是在拉入一个巨大的 gzip 压缩 csv 并在转换后插入行时效果非常好。
【解决方案2】:

您可以在可读流中使用pause method 在将块写入 mongodb 时停止流。

readableStream
            .pipe(split()) // This splits the data into 'lines'
            .on('data', function (chunk) {

                readableStream.pause()

                chunkCount++;

                syncFunctionToWriteLogEntryWithCallback( chunk, function() {
                    readableStream.resume();
                } );

            })
            .on('end', function () {
                // resolve the promise which bounds this process
                defer.resolve({v:3,chunkCount: chunkCount})

            });

我认为在这种情况下 MongoDB 不会出现重大问题。

【讨论】:

  • 谢谢。我确实看过 .pause() 但文档指出 .pause() 不会立即停止流,但在暂停发生之前可能会传递几个进一步的块。如果此时已经调用了 resume,那么在我看来,.pause() 的这一方面可能会完全否定预期的效果。但我会用它做实验,看看它的表现如何。
  • 所以,我添加了 pause() 和 resume()。对于具有 38508 行的日志文件,当我们到达流的“结尾”时,37913 次 Mongo 写入仍在“排队”等待处理,这表明 pause()/resume() 机制并没有真正帮助节流循环。我真的很想要一种保持读/写同步的机制。
  • 嗯。好的!也许使用Transform stream 会更有效。我会尝试举个例子
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-11-05
  • 2012-06-18
  • 2012-08-18
相关资源
最近更新 更多