【发布时间】:2018-11-24 12:56:08
【问题描述】:
我有一个通过回调批量流式传输数据的函数。
每个批次将在获取另一个批次之前等待回调函数,并且整个函数返回一个承诺,当所有批次完成时解析。
(我使用 TypeScript 注释来帮助提高可读性)
async function callbackStream(fn: (batch: Array<number>) => Promise<void>) {}
如何将此函数转换为一次生成一个值的异步生成器?
async function* generatorStream(): AsyncIterableIterator<number> {}
事实证明,这是一项相当艰巨的任务。
我已经解决了这个问题,我已经构建了一些可行的东西,但它非常复杂,我无法证明合并这段代码并让我团队中的其他人处理它是合理的。
这是我当前的实现:
我正在使用这个帮助函数,它创建了一个“延迟”承诺,有助于在回调中传递承诺。
interface DeferredPromise<T> {
resolve: (value: T) => void
reject: (error: any) => void
promise: Promise<T>
}
function deferred<T>(): DeferredPromise<T> {
let resolve
let reject
const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})
return {
resolve: resolve as (value: T) => void,
reject: reject as (error: any) => void,
promise,
}
}
接下来,我有一个逻辑毛球,它将 Promise 回调线性化成一个链,其中每个 Promise 用 next 函数解析一个批处理,该函数将返回另一个 Promise 获取下一个批处理。
type Done = { done: true }
type More = { done: false; value: Array<number>; next: () => Promise<Result> }
type Result = More | Done
async function chainedPromises() {
let deferred = PromiseUtils.deferred<Result>()
callbackStream(async batch => {
const next = PromiseUtils.deferred<null>()
deferred.resolve({
done: false,
value: batch,
next: () => {
deferred = PromiseUtils.deferred<Result>()
next.resolve(null)
return deferred.promise
},
})
await next.promise
}).then(() => {
deferred.resolve({ done: true })
})
return deferred.promise
}
从这里开始,创建一个一次生成一个项目的生成器并不是很困难:
async function* generatorStream(): AsyncIterableIterator<number> {
let next = chainedPromises
while (true) {
const result = await next()
if (result.done) {
return
}
for (const item of result.value) {
yield item
}
next = result.next
}
}
我想我们都同意中间的chainedPromises 函数非常令人困惑和复杂。 有什么方法可以将callbackStream 转换为generatorStream,并且易于理解和遵循?我不介意使用已建立的库,但我会也很欣赏从第一原则出发的简单实现。
【问题讨论】:
-
是的,看起来您的代码在这里过于复杂了,.. 如果您使用生成器,为什么还要回调或延迟?查看您的代码,试图弄清楚您想要实现的目标有点棘手。如果它是一个生成块的流,那么那是一个异步生成器将会发光..
-
"每个批次都将等待回调函数" - 所以 lib 已经理解了 Promise?你能展示一下它是如何实现的吗?
-
我猜
callbackStream实际上是以一种更适合返回迭代器而不是接受回调的方式实现的,因此更改实现比以某种方式包装它要简单得多以适应迭代器接口。 -
嘿@Bergi,你是完全正确的。 2小时后,一切顺利。节点流、生成器和整个异步可迭代迭代器抽象......
标签: javascript node.js typescript async-await generator