【问题标题】:Nodejs Async Promise QueueNodejs 异步承诺队列
【发布时间】:2018-11-03 01:13:24
【问题描述】:

我需要使用速率受限的 API。例如,我在一秒钟内只能进行 10 次 API 调用,所以我需要等待当前秒结束才能进行另一个 API 调用。

为了实现这一点,我想创建一个可以自行管理的异步队列。它的主要功能是让我向队列中添加一个新的 Promise,当 Promise 解决时,应用程序会收到通知:

let queue = new Queue()

queue.add(api.get('/somepath')).then(res => { // handle response });

如何使用普通的 Promise 实现这一点?

export class AsyncQueue {

    private queue: Array<Promise<any>>;


    add(promise, fct) {
        this.queue.push(promise);
    }

    resolveNext() {
        this.queue.pop().then({
            // how to resolve the waiting promise in my application
        })
    }

    get length() {
        return this.queue.length
    }

}

【问题讨论】:

  • 这听起来像是 Observables 的工作。
  • 我认为只使用promise就可以做到吗?
  • 看看你的代码,api.get() 会被立即调用。在队列中被pop()ed 后不应该被调用吗?
  • 为什么不将响应处理承诺链接到 api 调用?
  • 可以用一根针和一只稳定的手来完成,但我通常更喜欢经过实战考验、拥有蓬勃发展的社区的 pubsub,而不是我自己的临时实现。

标签: javascript node.js asynchronous queue es6-promise


【解决方案1】:

在当前实现中,api.get() 将在add 加入队列时立即被调用。你应该add path 代替(或者可能同时使用api.getpath)并让AsyncQueue 在可行时初始化Promise。确保让 add 返回一个 Promise,一旦 API 调用完成,该 Promise 就会解析。

例如,在 vanilla JS 中,它可能如下所示:

const apiGet = () => new Promise(resolve => setTimeout(resolve, 1000));

class AsyncQueue {
  queue = [];
  constructor() {
    setInterval(this.resolveNext.bind(this), 2000);
  }
  add(fn, param) {
    return new Promise(resolve => {
      this.queue.unshift({ fn, param, resolve });
    });
  }
  resolveNext() {
    if (!this.queue.length) return;
    const { fn, param, resolve } = this.queue.pop();
    fn(param).then(resolve);
  }
}


const queue = new AsyncQueue()
console.log('start');
// Will resolve after 2000 + 1000 seconds:
queue.add(apiGet, '/somepath').then(res => {
  console.log('handling response 1');
});
// Will resolve after 4000 + 1000 seconds:
queue.add(apiGet, '/somepath').then(res => {
  console.log('handling response 2');
});

【讨论】:

  • 这是一个非常好的方法。
  • 我有很多时间等待找到解决我的问题的方法,即在前一个完成时调用异步函数,使用队列。剩下的问题是堆栈,如果异步函数需要更多时间调用者。但这就是生活。
【解决方案2】:

为了避免永久调用resolveNext(),可以这样实现吗?

class AsyncQueue {
  
    /* delayBetween: delay (ms) before calling next item
        */
  constructor( delayBetween) {
        this.queue = [];
        this.id = 0;
        if (delayBetween < 1) {
            delayBetween = 1;
        }
        this.delayBetween = delayBetween;
        this.timer = null;
    // setInterval( this.resolveNext.bind(this), this.delayBetween);
  }
    
  add(fn, param) {
    return new Promise( resolve => {
        // liste inversée : le dernier élément ajouté est au début du tableau
        this.id ++;
        param.queueId = this.id;
        // console.log( `${new Date().yyyymmddhhmmsslll()} > push request: ${JSON.stringify(param)}`);
      this.queue.unshift( { fn, param, resolve } );
        // console.log( `${new Date().yyyymmddhhmmsslll()} > add() > setTimeout...`);
        if (this.timer == null) {
            this.timer = setTimeout( this.resolveNext.bind(this), this.delayBetween);
        }
    });
  }
    
  resolveNext() {
        this.timer = null;
        // console.log( `${new Date().yyyymmddhhmmsslll()} > resolveNext() > called, len: ${this.queue.length}...`);
    if ( ! this.queue.length) return;       
    const { fn, param, resolve } = this.queue.pop();
        // console.log( `${new Date().yyyymmddhhmmsslll()} > pop request: ${JSON.stringify(param)}`);
        // execute fn, and call resolve only when finished
    // fn(param).then(resolve);
    fn(param).then((result) => {
        // console.log( `${new Date().yyyymmddhhmmsslll()} > fn resolved: ${JSON.stringify(result)}`);
        if (this.timer == null) {
            this.timer = setTimeout( this.resolveNext.bind(this), this.delayBetween);
        }
    });
  }
}

【讨论】:

    猜你喜欢
    • 2017-10-22
    • 2019-06-21
    • 1970-01-01
    • 2014-01-04
    • 2018-08-26
    • 1970-01-01
    • 2019-05-15
    • 1970-01-01
    • 2018-06-09
    相关资源
    最近更新 更多