【问题标题】:Wrap a resultset callback function with a generator/iterator使用生成器/迭代器包装结果集回调函数
【发布时间】:2019-01-29 14:27:05
【问题描述】:

我正在努力将基于回调的旧 API 转换为异步库。但我就是无法将“结果集”作为生成器(节点 10.x)工作。

原来的 API 是这样工作的:

api.prepare((err, rs) => {
    rs.fetchRows(
        (err, row) => {
            // this callback is called as many times as rows exist
            console.log("here's a row:", row);
        },
        () => {
            console.log("we're done, data exausted");
        }
    );
});

但这是我想使用它的方式:

const wrapped = new ApiWrapper(api);
const rs = await wrapped.prepare({});
for (let row of rs.rows()) {
    console.log("here's a row:", row);
}

let row;
while(row = await rs.next()) {
    console.log("here's a row:", row);
}

我以为我可以使用生成器来控制它,但看起来你不能在回调中使用 yield。如果你仔细想想,这实际上似乎是合乎逻辑的。

class ApiWrapper {
    constructor(api) {
        this.api = api;
    }
    prepare() {
        return new Promise((resolve, reject) => {
            this.api.prepare((err, rs) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(rs);
                }
            });
        });
    }
    *rows() {
        this.api.fetchRows((err, row) => {
            if (err) {
                throw err;
            } else {
                yield row; // nope, not allowed here
            }
        });
    }
    next() { ... }
}

那么我有什么选择呢?

重要提示:我不想在数组中存储任何内容然后对其进行迭代,我们在这里讨论的是千兆负载的行数据。

编辑 我可以使用stream.Readable 模拟我想要的行为,但它警告我这是一个实验性功能。这是我尝试使用stream 解决的基于数组的简化版本:

const stream = require('stream');
function gen(){
    const s = new stream.Readable({
        objectMode: true,
        read(){
            [11, 22, 33].forEach(row => {
                this.push({ value: row });
            });
            this.push(null)
        }
    });
    return s;
}

for await (let row of gen()) {
    console.log(row);
}

// { value: 11 }
// { value: 22 }
// { value: 33 }

(node:97157) ExperimentalWarning: Readable[Symbol.asyncIterator] is an experimental feature. This feature could change at any time

【问题讨论】:

  • 生成器不是异步的。您在寻找AsyncIterator 吗?
  • 不确定,但它可以工作。我去看看,谢谢。
  • AsyncIterator 可能是要走的路,但我仍然不明白如何从回调中返回迭代器或生成器。我需要像可迭代的Promise 这样的东西,它可以为到达回调的每一行数据实现。我可以接近这一点是通过返回一个流(它们是可迭代的),每个回调都会将行推入。
  • 迭代器的每个next 调用都需要为下一行创建一个promise - 您可能需要为此创建一个解析器队列。
  • 我想我已经找到了答案,@bergi 关于“队列”的说法我尝试了一个不起作用的队列实现,然后意识到我需要像 Go 的通道这样可以与 async/await 一起使用的东西.这个精彩的模块做到了:npmjs.com/package/@nodeguy/channel。我会尽快发布答案。

标签: javascript node.js ecmascript-6


【解决方案1】:

我终于意识到我需要类似于 Go 的频道,并且与 async/await 兼容。基本上答案是同步一个异步迭代器和一个回调,让它们在 next() 迭代被消耗时相互等待。

我发现的最佳(Node)native 解决方案是使用 stream 作为迭代器,它在 Node 10.x 中受支持,但标记为实验性的。我还尝试使用p-defer NPM 模块来实现它,但结果比我预期的要复杂。终于碰到了https://www.npmjs.com/package/@nodeguy/channel 模块,这正是我所需要的:

const Channel = require('@nodeguy/channel');

class ApiWrapper {
    // ...
    rows() {
        const channel = new Channel();
        const iter = {
            [Symbol.asyncIterator]() {
                return this;
            },
            async next() {
                const val = await channel.shift();
                if (val === undefined) {
                    return { done: true };
                } else {
                    return { done: false, value: val };
                }
            }
        };

        this.api.fetchRows(async (err, row) => {
            await channel.push(row);
        }).then(() => channel.close());

        return iter;
    }
}

// then later

for await (let row of rs.rows()) {
     console.log(row)
}

注意每个迭代函数核心next()rows() 有一个await 来限制可以通过通道推送的数据量,否则生产回调最终可能会无法控制地将数据推送到通道队列中.这个想法是回调应该等待数据被迭代器next()消费,然后再推送更多。

这是一个更独立的例子:

const Channel = require('@nodeguy/channel');

function iterating() {
    const channel = Channel();

    const iter = {
        [Symbol.asyncIterator]() {
            return this;
        },
        async next() {
            console.log('next');
            const val = await channel.shift();
            if (val === undefined) {
                return { done: true };
            } else {
                return { done: false, value: val };
            }
        }
    };

    [11, 22, 33].forEach(async it => {
        await channel.push(it);
        console.log('pushed', it);
    });

    console.log('returned');
    return iter;
}

(async function main() {
    for await (let it of iterating()) {
        console.log('got', it);
    }
})();

/*
returned
next
pushed 11
got 11
next
pushed 22
got 22
next
pushed 33
got 33
next
*/

就像我说的,可以使用 Streams 和/或 Promises 来实现这一点,但 Channel 模块解决了一些复杂性,使其更直观。

【讨论】:

  • 我认为 await 调用 channel.push(…) 没有意义,无论是在 fetchRows 还是 forEach 回调中。两者都将忽略回调返回的任何承诺,它们不支持背压。
  • channel.push()fetchRows 中的await 保证next()相同的顺序 处理结果。如果回调调用者等待回调响应,它还可以防止背压。在forEach() 中,只需滚动到代码下方的输出即可看到它同步工作。如果你去掉forEach()中的await,数组项会在第一时间推送到channel,然后迭代器就会运行。
  • "如果回调调用者等待回调响应" - 是的,只有当调用者这样做时。但是fetchRows 似乎没有承诺意识。 (如果是这样,您可以比使用该通道更容易实现异步迭代器)。不,它与订单无关。
  • "如果您删除 forEach() 中的 await,则数组项将在第一时间推送到频道" - 它们总是如此,因为 @ 987654347@ 忽略承诺并同步迭代数组。唯一的区别是await 导致“推送”日志是异步的——push() 调用本身会立即发生。尝试在push() 调用之前记录“going to push”。您可以使用for await (const x of iterating()) { await new Promise(res => setTimeout(res, 100)); console.log('got', x); } 模拟实际背压。
  • 你是对的,它们正在被加载到通道内的数据结构中。所以回到第一个问题......当回调、迭代器、生成器和流都存在于语言中并且应该很容易将回调转换为迭代而不在内存中排队时,这是多么困难。
猜你喜欢
  • 2014-11-12
  • 1970-01-01
  • 2020-05-08
  • 2017-03-07
  • 2014-09-17
  • 1970-01-01
  • 2020-03-10
  • 2023-03-20
  • 2015-06-24
相关资源
最近更新 更多