【发布时间】:2016-06-01 10:48:57
【问题描述】:
我有一个 Node.js Typescript 程序,我在其中尝试逐行解析大型 CSV 文件并异步处理这些行。更具体地说,我需要一个函数:
- 打开一个 CSV 文件。
- 将下一行解析为对象。
- (理想情况下)收集一定数量的对象以进行批处理。
- 将对象传递给异步函数进行处理(返回承诺)。
- 从处理函数中收集 Promise。
一些要求和注意事项:
- 我需要对这些承诺中的任何一个进行投票以了解进展情况。
- 假设这些 CSV 文件很大;逐行流式传输是必要的。
- 我不应该在这些处理操作正在运行时阻止应用程序。
- 返回一组 promise 可能不是正确的方法,尤其是当我尝试读取包含一百万行的文件时。
- 我需要各种钩子来取消或重试失败的操作。
这是我已经开始工作的一些测试代码。 ObjectStream 是一个自定义 Node.js 转换,可将 CSV 行转换为对象。
function parseFileAsync(filePath: string): Promise<any> {
var doParseFileAsync = (filePath: string) => {
var streamDeferred = q.defer<Promise<any>[]>();
var promises: Promise<any>[] = [];
var propertyNames: string[] = [];
var stream = fs.createReadStream(filePath, { encoding: "utf8" })
.pipe(new LineStream({ objectMode: true }))
.pipe(new ObjectStream({ objectMode: true }));
stream.on("readable", () => {
var obj: Object;
while ((obj = stream.read()) !== null) {
console.log(`\nRead an object...`);
var operationDeferred = q.defer<any>();
operationDeferred.resolve(doSomethingAsync(obj));
promises.push(operationDeferred.promise);
}
});
stream.on("end", () => {
streamDeferred.resolve(promises);
});
return streamDeferred.promise;
}
return doParseFileAsync(filePath)
.then((result: Promise<any>[]) => {
return q.all(result);
});
}
parseFileAsync(filePath)
.done((result: any[]) => {
console.log(`\nFinished reading and processing the file:\n\t${result.toString()}`);
});
最后的done 调用甚至在流开始运行之前执行,因为parseFileAsync 立即用一个空数组完成;流还没有机会推送任何承诺。
经过几天的搜索,我仍然不确定执行此操作的正确方法是什么。 Node/JavaScript 专家:帮助?
更新
代码已更新,我的承诺现在运行良好。但是,我需要一种方法来连接流并在需要时取消该过程。我还需要一种方法来重试任何失败的操作。
【问题讨论】:
-
我最初的想法(没有过多阅读)是您应该选择一种或另一种范式。这似乎最好由流范式中的
map stream之类的东西处理。如果你必须使用promise(也许消费者期望得到一个promise),只要你的流全部退出就返回一个promise并解决它。 -
创建一个新的 Promise P,它在流完成时完成,将它的 Promise 添加到列表中。返回 P 并让它解析 Promise 列表。等待 P,然后等待所有 P 的结果。
-
@nick 坚持使用流是不可能的。我正在开发的应用程序使用了 Promise。我现在才介绍一个需要读取文件的功能。
-
明白了。我认为您仍然可以在我的评论后半部分使用该方法。例如。只需将流式行为包装在一个 Promise 中即可。
-
感谢您的建议;他们工作得很好!我对我原来的问题做了一些更新;我需要一个钩子来取消和重试。
标签: javascript node.js stream typescript promise