【问题标题】:How to convert Node.js async streaming callback into an async generator?如何将 Node.js 异步流回调转换为异步生成器?
【发布时间】:2018-11-24 12:56:08
【问题描述】:

我有一个通过回调批量流式传输数据的函数。

每个批次将在获取另一个批次之前等待回调函数,并且整个函数返回一个承诺,当所有批次完成时解析。

(我使用 TypeScript 注释来帮助提高可读性)

async function callbackStream(fn: (batch: Array<number>) => Promise<void>) {}

如何将此函数转换为一次生成一个值的异步生成器?

async function* generatorStream(): AsyncIterableIterator<number> {}

事实证明,这是一项相当艰巨的任务。

我已经解决了这个问题,我已经构建了一些可行的东西,但它非常复杂,我无法证明合并这段代码并让我团队中的其他人处理它是合理的。


这是我当前的实现:

我正在使用这个帮助函数,它创建了一个“延迟”承诺,有助于在回调中传递承诺。

interface DeferredPromise<T> {
    resolve: (value: T) => void
    reject: (error: any) => void
    promise: Promise<T>
}

function deferred<T>(): DeferredPromise<T> {
    let resolve
    let reject
    const promise = new Promise<T>((res, rej) => {
        resolve = res
        reject = rej
    })
    return {
        resolve: resolve as (value: T) => void,
        reject: reject as (error: any) => void,
        promise,
    }
}

接下来,我有一个逻辑毛球,它将 Promise 回调线性化成一个链,其中每个 Promise 用 next 函数解析一个批处理,该函数将返回另一个 Promise 获取下一个批处理。

type Done = { done: true }
type More = { done: false; value: Array<number>; next: () => Promise<Result> }
type Result = More | Done

async function chainedPromises() {
    let deferred = PromiseUtils.deferred<Result>()

    callbackStream(async batch => {
        const next = PromiseUtils.deferred<null>()
        deferred.resolve({
            done: false,
            value: batch,
            next: () => {
                deferred = PromiseUtils.deferred<Result>()
                next.resolve(null)
                return deferred.promise
            },
        })
        await next.promise
    }).then(() => {
        deferred.resolve({ done: true })
    })

    return deferred.promise
}

从这里开始,创建一个一次生成一个项目的生成器并不是很困难:

async function* generatorStream(): AsyncIterableIterator<number> {
    let next = chainedPromises
    while (true) {
        const result = await next()
        if (result.done) {
            return
        }
        for (const item of result.value) {
            yield item
        }
        next = result.next
    }
}

我想我们都同意中间的chainedPromises 函数非常令人困惑和复杂。 有什么方法可以将callbackStream 转换为generatorStream,并且易于理解和遵循?我不介意使用已建立的库,但我会也很欣赏从第一原则出发的简单实现。

【问题讨论】:

  • 是的,看起来您的代码在这里过于复杂了,.. 如果您使用生成器,为什么还要回调或延迟?查看您的代码,试图弄清楚您想要实现的目标有点棘手。如果它是一个生成块的流,那么那是一个异步生成器将会发光..
  • "每个批次都将等待回调函数" - 所以 lib 已经理解了 Promise?你能展示一下它是如何实现的吗?
  • 我猜callbackStream 实际上是以一种更适合返回迭代器而不是接受回调的方式实现的,因此更改实现比以某种方式包装它要简单得多以适应迭代器接口。
  • 嘿@Bergi,你是完全正确的。 2小时后,一切顺利。节点流、生成器和整个异步可迭代迭代器抽象......

标签: javascript node.js typescript async-await generator


【解决方案1】:

你需要一个事件桶,这里是一个例子:

function bucket() {
  const stack = [],
                iterate = bucket();
    
  var next;
  
  async function * bucket() {
        while (true) {
            yield new Promise((res) => {
                if (stack.length > 0) {
                    return res(stack.shift());
                }

                next = res;
            });
        }
  }  
  
  iterate.push = (itm) => {
    if (next) {
      next(itm);
      next = false;
      return;
    }

    stack.push(itm);
  }  
  
  return iterate;
}

;(async function() {
  let evts = new bucket();

  setInterval(()=>{
    evts.push(Date.now());
    evts.push(Date.now() + '++');
  }, 1000);

  for await (let evt of evts) {
    console.log(evt);
  }
})();

【讨论】:

【解决方案2】:

如果有打字稿解决方案会起作用吗?

当回调被调用时它应该处理条件,然后承诺被解决几次。 回调可以是具有此签名callback(error, result, index) 的方法 它设置为在不带参数调用回调时完成。 用法:

asAsyncOf(this.storage, this.storage.each);

解决方案:

function asAsyncOf<T1, T2, T3, T4, Y>(c, fn: { (a: T1, a1: T2, a2: T3, a3: T4, cb: { (err?, res?: Y, index?: number): boolean }): void }, a: T1, a1: T2, a2: T3, a3: T4): AsyncGenerator<Y>
function asAsyncOf<T1, T2, T3, Y>(c, fn: { (a: T1, a1: T2, a2: T3, cb: { (err?, res?: Y, index?: number): boolean }): void }, a: T1, a1: T2, a3: T3): AsyncGenerator<Y>
function asAsyncOf<T1, T2, Y>(c, fn: { (a: T1, a1: T2, cb: {(err?, res?: Y, index?: number): boolean}): void}, a: T1, a1: T2): AsyncGenerator<Y>
function asAsyncOf<T, Y>(c, fn: { (a: T, cb: { (err?, res?: Y, index?: number): boolean }): void }, a: T): AsyncGenerator<Y>
function asAsyncOf<Y>(c, fn: { (cb: {(err?, res?: Y, index?: number): boolean}): void}): AsyncGenerator<Y>
async function* asAsyncOf(context, fn, ...args) {
    let next = (result?) => { };
    let fail = (err) => { };
    let finish = {};
    const items = [];
    let started = true;
    try {
        fn.apply(context, [...args, function (err, result, index) {
            const nextArgs = [].slice.call(arguments, 0);
            if (nextArgs.length === 0) {
                started = false;
                next(finish);
                return true;
            }
            if (err) {
                fail(err);
                return true;
            }
            items.push(result);
            next(result);
        }]);
    } catch (ex) {
        fail(ex);
    }
    while (true) {
        const promise = started ? new Promise((resolve, error) => {
            next = resolve;
            fail = error;
        }) : Promise.resolve(finish);
        const record = await promise;
        if (record === finish) {
            while (items.length) {
                const item = items.shift();
                yield item;
            }
            return;
        }
        while (items.length) {
            const item = items.shift();
            yield item;
        }
    }
}
export { asAsyncOf };

【讨论】:

    【解决方案3】:

    不,我认为没有一种方法可以以易于理解和遵循的方式实现这种转换。但是,我建议放弃deferreds(无论如何你永远不会rejecting),而只使用promise 构造函数。另外我宁愿马上实现一个异步生成器。

    function queue() {
        let resolve = () => {};
        const q = {
            put() {
                resolve();
                q.promise = new Promise(r => { resolve = r; });
            },
            promise: null,
        }
        q.put(); // generate first promise
        return q;
    }
    function toAsyncIterator(callbackStream) {
        const query = queue();
        const result = queue();
        const end = callbackStream(batch => {
            result.put(batch);
            return query.promise;
        }).then(value => ({value, done: true}));
        end.catch(e => void e); // prevent unhandled promise rejection warnings
        return {
            [Symbol.asyncIterator]() { return this; },
            next(x) {
                query.put(x);
                return Promise.race([
                    end,
                    result.promise.then(value => ({value, done:false})
                ]);
            }
        }
    }
    async function* batchToAsyncIterator(batchCallbackStream) {
        for await (const batch of toAsyncIterator(batchCallbackStream)) {
            // for (const val of batch) yield val;
            // or simpler:
            yield* batch;
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2015-05-30
      • 1970-01-01
      • 2017-10-22
      • 2019-08-17
      • 2012-06-03
      • 2017-03-20
      • 2010-12-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多