【问题标题】:How do I aggregate promises generated from async functions within a Node.js stream callback?如何在 Node.js 流回调中聚合从异步函数生成的 Promise?
【发布时间】:2016-06-01 10:48:57
【问题描述】:

我有一个 Node.js Typescript 程序,我在其中尝试逐行解析大型 CSV 文件并异步处理这些行。更具体地说,我需要一个函数:

  1. 打开一个 CSV 文件。
  2. 将下一行解析为对象。
  3. (理想情况下)收集一定数量的对象以进行批处理。
  4. 将对象传递给异步函数进行处理(返回承诺)。
  5. 从处理函数中收集 Promise。

一些要求和注意事项:

  • 我需要对这些承诺中的任何一个进行投票以了解进展情况。
  • 假设这些 CSV 文件很大;逐行流式传输是必要的。
  • 我不应该在这些处理操作正在运行时阻止应用程序。
  • 返回一组 promise 可能不是正确的方法,尤其是当我尝试读取包含一百万行的文件时。
  • 我需要各种钩子来取消或重试失败的操作。

这是我已经开始工作的一些测试代码。 ObjectStream 是一个自定义 Node.js 转换,可将 CSV 行转换为对象。

function parseFileAsync(filePath: string): Promise<any> {
    var doParseFileAsync = (filePath: string) => {
        var streamDeferred = q.defer<Promise<any>[]>();
        var promises: Promise<any>[] = [];
        var propertyNames: string[] = [];

        var stream = fs.createReadStream(filePath, { encoding: "utf8" })
            .pipe(new LineStream({ objectMode: true }))
            .pipe(new ObjectStream({ objectMode: true }));

        stream.on("readable", () => {
            var obj: Object;
            while ((obj = stream.read()) !== null) {
                console.log(`\nRead an object...`);

                var operationDeferred = q.defer<any>();
                operationDeferred.resolve(doSomethingAsync(obj));
                promises.push(operationDeferred.promise);
            }
        });
        stream.on("end", () => {
            streamDeferred.resolve(promises);
        });

        return streamDeferred.promise;
    }

    return doParseFileAsync(filePath)
        .then((result: Promise<any>[]) => {
            return q.all(result);
        });
}
parseFileAsync(filePath)
    .done((result: any[]) => {
        console.log(`\nFinished reading and processing the file:\n\t${result.toString()}`);
    });

最后的done 调用甚至在流开始运行之前执行,因为parseFileAsync 立即用一个空数组完成;流还没有机会推送任何承诺。

经过几天的搜索,我仍然不确定执行此操作的正确方法是什么。 Node/JavaScript 专家:帮助?

更新

代码已更新,我的承诺现在运行良好。但是,我需要一种方法来连接流并在需要时取消该过程。我还需要一种方法来重试任何失败的操作。

【问题讨论】:

  • 我最初的想法(没有过多阅读)是您应该选择一种或另一种范式。这似乎最好由流范式中的map stream 之类的东西处理。如果你必须使用promise(也许消费者期望得到一个promise),只要你的流全部退出就返回一个promise并解决它。
  • 创建一个新的 Promise P,它在流完成时完成,将它的 Promise 添加到列表中。返回 P 并让它解析 Promise 列表。等待 P,然后等待所有 P 的结果。
  • @nick 坚持使用流是不可能的。我正在开发的应用程序使用了 Promise。我现在才介绍一个需要读取文件的功能。
  • 明白了。我认为您仍然可以在我的评论后半部分使用该方法。例如。只需将流式行为包装在一个 Promise 中即可。
  • 感谢您的建议;他们工作得很好!我对我原来的问题做了一些更新;我需要一个钩子来取消和重试。

标签: javascript node.js stream typescript promise


【解决方案1】:

我在程序架构中遇到了一些限制,这些限制不允许我随意传递 Promise。因此,与其启动一堆承诺,我决定等到上一批完成后再开始新的承诺。这是我采取的方法:

  1. 将流的东西分离到它自己的接受延续标记的函数中。如果有更多数据要读取,返回值将包含读取的数据以及延续令牌:

    function readFile(filepath: string, lines: number, start: any): Promise<any> {
        ...
    }
    
  2. 定义一个将运行可重试操作的函数。在此函数的主体中,从文件中检索和处理一大块数据。如果结果有延续记号,“递归”再次调用操作函数:

    function processFile(filepath: string, next: any): Promise<any> {
        var chunkSize = 1;
        return readLines(filepath, chunkSize, next)
            .then((result) => {
                // Do something with `result.lines`
                ...
                if (result.next) {
                    return parseFile(filepath, result.next);
                }
            });
    }
    

瞧!一个长时间运行的操作,对块进行操作并且很容易报告进度。

【讨论】:

    猜你喜欢
    • 2018-11-24
    • 2016-05-20
    • 2015-01-22
    • 2023-03-08
    • 2016-03-08
    • 2020-02-10
    • 1970-01-01
    • 2020-06-04
    • 2019-01-20
    相关资源
    最近更新 更多