【发布时间】:2016-01-12 19:09:21
【问题描述】:
我需要构建一个函数来处理用于 bluebird.map() 调用的大型 CSV 文件。鉴于文件的潜在大小,我想使用流式传输。
这个函数应该接受一个流(一个 CSV 文件)和一个函数(处理来自流的块),并在文件被读取结束(已解决)或错误(被拒绝)时返回一个承诺。
所以,我开始:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
现在,我有两个相互关联的问题:
- 我需要限制正在处理的实际数据量,以免造成内存压力。
- 作为
processor参数传递的函数通常是异步的,例如通过基于承诺的库(现在:pg-promise)将文件的内容保存到数据库。因此,它会在记忆中创造一个承诺,并不断地继续前进。
pg-promise 库具有管理此问题的函数,例如 page(),但我无法提前说明如何将流事件处理程序与这些 Promise 方法混合使用。现在,我在每个read() 之后的readable 部分的处理程序中返回一个promise,这意味着我创建了大量承诺的数据库操作并最终因为我达到进程内存限制而出错。
有没有人有一个可以用作跳跃点的工作示例?
更新:可能不止一种给猫剥皮的方法,但这是可行的:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
有人发现这种方法存在潜在问题吗?
【问题讨论】:
-
看起来很整洁,如果可行,那就太好了!我很高兴最近将
page添加到pg-promise中并没有白费;) -
只是在 readDataFromStream 的末尾简化了它;)你不需要
return undefined,当你什么都不返回时会发生这种情况;) -
其实这个可能有问题...调用db.task的时候,你不处理它的结果,所以如果它拒绝,就会抛出一个错误承诺库,您的拒绝未被处理。
-
我应该在
task()上使用catch()进行return this.page()吗? -
我已经更新了我的答案 - 它为您提供了如何解决问题的全貌。
标签: node.js promise bluebird pg-promise