【问题标题】:NodeJS streams not awaiting asyncNodeJS 流不等待异步
【发布时间】:2019-06-30 02:29:18
【问题描述】:

我在测试 NodeJS 流时遇到了问题。在运行 stream.pipeline 后,我似乎无法让我的项目等待 Duplex 和 Transform 流的输出,即使它返回了一个承诺。也许我遗漏了一些东西,但我相信脚本应该等待函数返回,然后再继续。我正在努力开展的项目中最重要的部分是:

// Message system is a duplex (read/write) stream
export class MessageSystem extends Duplex {
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _read(size: number): void {
        var chunk = this.read();
        console.log(`Recieved ${chunk}`);
        this.push(chunk);
    }
    public _write(chunk: Message, encoding: string, 
        callback: (error?: Error | null | undefined, chunk?: Message) => any): void {
        if (chunk.data === null) {
            callback(new Error("Message.Data is null"));
        } else {
            callback();
        }
    }
}

export class SystemStream extends Transform {
    public type: MessageType = MessageType.Global;
    public data: Array<Message> = new Array<Message>();
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _transform(chunk: Message, encoding: string, 
        callback: TransformCallback): void {
        if (chunk.single && (chunk.type === this.type || chunk.type === MessageType.Global)) {
            console.log(`Adding ${chunk}`);
            this.data.push(chunk);
            chunk = new Message(chunk.data, MessageType.Removed, true);
            callback(undefined, chunk); // TODO: Is this correct?
        } else if (chunk.type === this.type || chunk.type === MessageType.Global) { // Ours and global
            this.data.push(chunk);
            callback(undefined, chunk);
        } else { // Not ours
            callback(undefined, chunk);
        }
    }
}

export class EngineStream extends SystemStream {
    public type: MessageType = MessageType.Engine;
}

export class IOStream extends SystemStream {
    public type: MessageType = MessageType.IO;
}

let ms = new MessageSystem();
let es = new EngineStream();
let io = new IOStream();

let pipeline = promisify(Stream.pipeline);

async function start() {
    console.log("Running Message System");
    console.log("Writing new messages");
    ms.write(new Message("Hello"));
    ms.write(new Message("world!"));
    ms.write(new Message("Engine data", MessageType.Engine));
    ms.write(new Message("IO data", MessageType.IO));
    ms.write(new Message("Order matters in the pipe, even if Global", MessageType.Global, true));
    ms.end(new Message("Final message in the stream"));
    console.log("Piping data");
    await pipeline(
        ms,
        es,
        io
    );
}

Promise.all([start()]).then(() => {
    console.log(`Engine Messages to parse: ${es.data.toString()}`);
    console.log(`IO Messages to parse: ${io.data.toString()}`);
});

输出应该类似于:

Running message system
Writing new messages
Hello
world!
Engine Data
IO Data
Order Matters in the pipe, even if Global
Engine messages to parse: Engine Data
IO messages to parse: IO Data

任何帮助将不胜感激。谢谢!

注意:我是用我的其他帐户发布的,而不是我的实际帐户。为重复道歉。

编辑:我最初将 repo 设为私有,但已将其公开以帮助澄清答案。更多用法可以在feature/inital_system branch 上找到。签出时可以使用npm start 运行。

编辑:我把我的自定义流放在这里是为了详细说明。我认为我比以前走得更好,但现在收到了一个“空”对象。

【问题讨论】:

  • 您究竟希望发生什么,以及会发生什么?
  • 你希望new Transform();做什么?
  • 也许我没有分享足够的信息。我会尽快更新完整的脚本文件
  • 我已经更新了更多信息,以及显示此 POC 的 GitHub 项目。谢谢!

标签: node.js typescript ecmascript-6 es6-promise nodejs-stream


【解决方案1】:

正如the documentation 所说,stream.pipeline 是基于回调的,不会返回承诺。

它有自定义的承诺版本,可以通过util.promisify 访问:

const pipeline = util.promisify(stream.pipeline);

...

await pipeline(...);

【讨论】:

  • 最诚挚的歉意,我忘了添加该代码。我的代码中也有它,但它仍然没有运行。我将编辑帖子以更正此问题
  • 该问题似乎没有包含任何可以解释该问题的内容。请提供可以复制问题的stackoverflow.com/help/mcve
  • 我已经更新了预期的输出,以及显示此 POC 的 GitHub 存储库
  • 我查过了。我没有机会弄清楚流类内部发生了什么,但我相信问题是他们特有的。 pipeline 与基本的 createReadStream 和 createWriteStream 一起工作。至于所描述的pipeline 行为,它会导致未决的承诺永远不会解决,所以它只是退出。 pipeline 回调也会有同样的问题。
  • 所以如果我的流没有正确读取/写入数据并让承诺悬而未决,这将是一些问题。我将进一步研究 API 以了解它为什么不更新并报告回来。谢谢!
【解决方案2】:

在过去几天的一些工作之后,我找到了答案。问题是我对双工流的实现。从那以后,我将MessageSystem 更改为更易于管理和使用的转换流。

这是产品:

export class MessageSystem extends Transform {
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _transform(chunk: Message, encoding: string,
        callback: TransformCallback): void {
            try {
                let output: string = chunk.toString();
                callback(undefined, output);
            } catch (err) {
                callback(err);
            }
        }
}

感谢@estus 的快速回复和检查。同样,我一直在 API 中找到我的答案!

我的发现的存档存储库可以在 this repository 中找到。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-07-20
    • 2018-02-08
    • 2020-03-31
    • 2019-08-08
    • 1970-01-01
    • 2021-10-16
    • 2016-07-07
    相关资源
    最近更新 更多