【问题标题】:NodeJS, promises, streams - processing large CSV filesNodeJS、promise、streams - 处理大型 CSV 文件
【发布时间】:2016-01-12 19:09:21
【问题描述】:

我需要构建一个函数来处理用于 bluebird.map() 调用的大型 CSV 文件。鉴于文件的潜在大小,我想使用流式传输。

这个函数应该接受一个流(一个 CSV 文件)和一个函数(处理来自流的块),并在文件被读取结束(已解决)或错误(被拒绝)时返回一个承诺。

所以,我开始:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

现在,我有两个相互关联的问题:

  1. 我需要限制正在处理的实际数据量,以免造成内存压力。
  2. 作为processor 参数传递的函数通常是异步的,例如通过基于承诺的库(现在:pg-promise)将文件的内容保存到数据库。因此,它会在记忆中创造一个承诺,并不断地继续前进。

pg-promise 库具有管理此问题的函数,例如 page(),但我无法提前说明如何将流事件处理程序与这些 Promise 方法混合使用。现在,我在每个read() 之后的readable 部分的处理程序中返回一个promise,这意味着我创建了大量承诺的数据库操作并最终因为我达到进程内存限制而出错。

有没有人有一个可以用作跳跃点的工作示例?

更新:可能不止一种给猫剥皮的方法,但这是可行的:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

有人发现这种方法存在潜在问题吗?

【问题讨论】:

  • 看起来很整洁,如果可行,那就太好了!我很高兴最近将page 添加到pg-promise 中并没有白费;)
  • 只是在 readDataFromStream 的末尾简化了它;)你不需要return undefined,当你什么都不返回时会发生这种情况;)
  • 其实这个可能有问题...调用db.task的时候,你不处理它的结果,所以如果它拒绝,就会抛出一个错误承诺库,您的拒绝未被处理。
  • 我应该在task() 上使用catch() 进行return this.page() 吗?
  • 我已经更新了我的答案 - 它为您提供了如何解决问题的全貌。

标签: node.js promise bluebird pg-promise


【解决方案1】:

所以说您不想要流式传输而是某种数据块? ;-)

你知道https://github.com/substack/stream-handbook吗?

我认为在不改变架构的情况下最简单的方法是某种承诺池。例如https://github.com/timdp/es6-promise-pool

【讨论】:

  • 嗯,我想过在函数中使用async.queue,返回一个最终完成文件的承诺(或不完成)。但是,我想知道如何将像 Bluebird 这样的 Promise 库与典型的基于流的大文件处理联系起来。 ('pg-promise`包括spex,它提供了更高级别的promise函数)
【解决方案2】:

在下面找到一个完整的应用程序,它可以正确执行您想要的相同类型的任务:它将文件作为流读取,将其解析为 CSV 并将每一行插入到数据库中。

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

请注意,我唯一更改的是:使用库 csv-parse 而不是 csv,作为更好的选择。

添加了对来自spex 库的方法stream.read 的使用,该方法可以正确地为Readable 流提供与promise 一起使用的服务。

【讨论】:

  • 这难道不是在query("INSERT…") 完成后尝试从parser 读取下一项,而不管下一项是否已经可读?还是parser.read() 会返回一个承诺?
  • 另外,OP 正在寻找的返回 promise 的 processor 回调函数发生了什么?
  • @Bergi 我的理解是 parser.read() 是同步的,就像它显示的那样。如果事实证明不是,那么显然需要将其包装成一个承诺。并且readable 被触发一次,而不是每次读取操作,这是我的理解。至于返回承诺的处理器,他只是在数据处理完成时寻找解决方案,并在失败时拒绝,我的示例提供了这一点,即任务将相应地解决/拒绝。
  • 嗯,我需要再次阅读stream docs,但我不认为它是这样工作的
  • 是的,我自己对流部分不太确定,我根据问题提供的代码编写了示例。如果那个代码是错误的,那么我的也是。但是,它确实显示了一般方法。
【解决方案3】:

你可能想看看promise-streams

var ps = require('promise-streams');
passedStream
  .pipe(csv.parse({trim: true}))
  .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
  .wait().then(_ => {
    console.log("All done!");
  });

适用于背压和一切。

【讨论】:

    【解决方案4】:

    我找到了一种更好的方法来做同样的事情;有更多的控制权。这是一个具有精确并行控制的最小骨架。以并行值为一,所有记录按顺序处理,而无需将整个文件放在内存中,我们可以增加并行值以加快处理速度。

          const csv = require('csv');
          const csvParser = require('csv-parser')
          const fs = require('fs');
    
          const readStream = fs.createReadStream('IN');
          const writeStream = fs.createWriteStream('OUT');
    
          const transform = csv.transform({ parallel: 1 }, (record, done) => {
                                               asyncTask(...) // return Promise
                                               .then(result => {
                                                 // ... do something when success
                                                 return done(null, record);
                                               }, (err) => {
                                                 // ... do something when error
                                                 return done(null, record);
                                               })
                                           }
                                         );
    
          readStream
          .pipe(csvParser())
          .pipe(transform)
          .pipe(csv.stringify())
          .pipe(writeStream);
    

    这允许为每条记录执行异步任务。

    要返回一个承诺,我们可以返回一个空承诺,并在流结束时完成它。

        .on('end',function() {
          //do something wiht csvData
          console.log(csvData);
        });
    

    【讨论】:

      猜你喜欢
      • 2015-02-20
      • 2015-10-10
      • 2018-02-23
      • 2017-07-05
      • 2014-08-12
      • 2014-09-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多