【问题标题】:Asynchronous Bounded Queue in JS/TS using async/awaitJS/TS 中使用 async/await 的异步有界队列
【发布时间】:2018-10-27 04:43:48
【问题描述】:

我正试图绕过async/await,我有以下代码:

class AsyncQueue<T> {
    queue = Array<T>()
    maxSize = 1

    async enqueue(x: T) {
        if (this.queue.length > this.maxSize) {
            // Block until available
        }

        this.queue.unshift(x)
    }

    async dequeue() {
        if (this.queue.length == 0) {
            // Block until available
        }

        return this.queue.pop()!
    }
}

async function produce<T>(q: AsyncQueue, x: T) {
    await q.enqueue(x)
}

async function consume<T>(q: AsyncQueue): T {
    return await q.dequeue()
}

// Expecting 3 4 in the console
(async () => {
    const q = new AsyncQueue<number>()
    consume(q).then(console.log)
    consume(q).then(console.log)
    produce(q, 3)
    produce(q, 4)
    consume(q).then(console.log)
    consume(q).then(console.log)
})()

当然,我的问题在于代码的“阻塞直到可用”部分。我期望能够“停止”执行直到发生某些事情(例如,出队停止直到存在入队,反之亦然,因为有可用空间)。我觉得我可能需要为此使用协程,但我真的想确保我不会在这里错过任何 async/await 魔法。

【问题讨论】:

  • 不想想要block,那会冻结脚本——你应该有enqueuedequeueawait承诺一旦解决,无论他们'正在等待可用。此外,您应该使用() 调用构造函数
  • 看来我正在将async/await 游戏推得越来越远,我不清楚这一切将如何解决。
  • 不,你不需要协程,你只需要 new Promise 构造函数来等待外部发生的事情。但是,您正在实现协程,CSP-style

标签: javascript typescript asynchronous async-await


【解决方案1】:

2019 年 4 月 17 日更新: 长话短说,下面的 AsyncSemaphore 实现中有一个错误,它是使用 property-based 测试捕获的。 You can read all about this "tale" here。这是固定版本:

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()!()
    }

    async wait() {
        this.permits -= 1
        if (this.permits < 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
    }
}

最后,经过相当大的努力,并受到@Titian 回答的启发,我想我解决了这个问题。代码中充满了调试消息,但它可能用于教学关于控制流的目的:

class AsyncQueue<T> {
    waitingEnqueue = new Array<() => void>()
    waitingDequeue = new Array<() => void>()
    enqueuePointer = 0
    dequeuePointer = 0
    queue = Array<T>()
    maxSize = 1
    trace = 0

    async enqueue(x: T) {
        this.trace += 1
        const localTrace = this.trace

        if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {
            console.debug(`[${localTrace}] Producer Waiting`)
            this.dequeuePointer += 1
            await new Promise(r => this.waitingDequeue.unshift(r))
            this.waitingDequeue.pop()
            console.debug(`[${localTrace}] Producer Ready`)
        }

        this.queue.unshift(x)
        console.debug(`[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]`)

        if (this.enqueuePointer > 0) {
            console.debug(`[${localTrace}] Notify Consumer`)
            this.waitingEnqueue[this.enqueuePointer-1]()
            this.enqueuePointer -= 1
        }
    }

    async dequeue() {
        this.trace += 1
        const localTrace = this.trace

        console.debug(`[${localTrace}] Queue length before pop: ${this.queue.length}`)

        if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {
            console.debug(`[${localTrace}] Consumer Waiting`)
            this.enqueuePointer += 1
            await new Promise(r => this.waitingEnqueue.unshift(r))
            this.waitingEnqueue.pop()
            console.debug(`[${localTrace}] Consumer Ready`)
        }

        const x = this.queue.pop()!
        console.debug(`[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}`)

        if (this.dequeuePointer > 0) {
            console.debug(`[${localTrace}] Notify Producer`)
            this.waitingDequeue[this.dequeuePointer - 1]()
            this.dequeuePointer -= 1
        }

        return x
    }
}

更新:这是一个使用 AsyncSemaphore 的干净版本,它真正封装了通常使用并发原语完成的方式,但适应了异步 CPS 单线程事件循环™ 风格的 JavaScript 与 async/await。可以看到AsyncQueue的逻辑变得直观了很多,通过Promises的双重同步委托给了两个semaphores

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()()
    }

    async wait() {
        if (this.permits == 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
        this.permits -= 1
    }
}

class AsyncQueue<T> {
    private queue = Array<T>()
    private waitingEnqueue: AsyncSemaphore
    private waitingDequeue: AsyncSemaphore

    constructor(readonly maxSize: number) {
        this.waitingEnqueue = new AsyncSemaphore(0)
        this.waitingDequeue = new AsyncSemaphore(maxSize)
    }

    async enqueue(x: T) {
        await this.waitingDequeue.wait()
        this.queue.unshift(x)
        this.waitingEnqueue.signal()
    }

    async dequeue() {
        await this.waitingEnqueue.wait()
        this.waitingDequeue.signal()
        return this.queue.pop()!
    }
}

更新 2: 上面的代码中似乎隐藏了一个微妙的错误,当尝试使用大小为 0 的 AsyncQueue 时,这一点变得很明显。语义确实有意义:它是没有任何缓冲区的队列,发布者总是等待消费者存在。阻止它工作的行是:

await this.waitingEnqueue.wait()
this.waitingDequeue.signal()

如果您仔细观察,您会发现dequeue()enqueue() 并不完全对称。事实上,如果交换这两条指令的顺序:

this.waitingDequeue.signal()
await this.waitingEnqueue.wait()

然后一切都恢复正常;在我看来,在实际等待enqueuing 发生之前,我们会发出信号表明对dequeuing() 感兴趣。

如果没有经过广泛的测试,我仍然不确定这不会重新引入细微的错误。我将把它作为一个挑战;)

【讨论】:

  • 很高兴看到您对答案有所帮助。我赶上了其他事情,没有时间调试它。我删除了原来的答案,因为它正在投票,当之无愧。
猜你喜欢
  • 2017-10-26
  • 2020-03-01
  • 1970-01-01
  • 2018-04-19
  • 2018-02-15
  • 2016-12-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多