是的,等待锁的任务被添加到队列中,并以 FIFO 为基础唤醒。
具体来说,当尝试获取锁定的锁时,会创建一个 future 来等待锁可用的信号,称为 waiter。这个服务员被添加到collections.deque()双端队列,created in Lock.__init__()
self._waiters = collections.deque()
当锁被当前持有它的任务释放时,Lock._wake_up_first() method 被调用:
def _wake_up_first(self):
"""Wake up the first waiter if it isn't done."""
try:
fut = next(iter(self._waiters))
except StopIteration:
return
# .done() necessarily means that a waiter will wake up later on and
# either take the lock, or, if it was cancelled and lock wasn't
# taken already, will hit this again and wake up a new waiter.
if not fut.done():
fut.set_result(True)
Future.set_result() call 标志着未来已经完成。这究竟是如何导致等待未来重新获得控制权的任务取决于实现,但通常这是通过提供给事件循环的回调函数来完成的,以便尽早调用。
Lock.acquire() method 负责添加和删除期货(因为这是在发出信号后将返回的未来):
fut = self._loop.create_future()
self._waiters.append(fut)
# Finally block should be called before the CancelledError
# handling as we don't want CancelledError to call
# _wake_up_first() and attempt to wake up itself.
try:
try:
await fut
finally:
self._waiters.remove(fut)
except futures.CancelledError:
if not self._locked:
self._wake_up_first()
raise
所以如果锁被锁定,当前任务通过创建一个future对象来等待,该对象被添加到_waiters队列中,等待future。这会阻止任务,直到未来有结果(await fut 直到那时才会返回)。事件循环不会给这个任务任何处理时间。
另一个当前持有锁并释放它的任务将导致来自_waiters 队列的第一个(最长等待)future 有一个结果集,间接导致正在等待该future 的任务再次变为活动状态。当锁释放任务将控制权交还给事件循环(等待其他事情时),事件循环将控制权交给等待该未来的任务,未来返回到await fut行,未来从队列和锁被给予在那个未来等待的任务。
这里有一个Lock.acquire() 方法显式处理的竞争条件情况:
- 任务 A 释放锁,队列持有等待锁的任务 B 的未来。未来已成定局。
- 事件循环将控制权交给第三个任务 C,该任务 C 正在等待不相关但现在再次处于活动状态,并且此任务运行尝试获取锁的代码。
任务 C 不会给予锁,但是,因为在 Lock.acquire() 方法的顶部是这个测试:
if not self._locked and all(w.cancelled() for w in self._waiters):
self._locked = True
return True
not self._locked 在他的情况下是正确的,因为任务 A 已经发布了它。但all(w.cancelled() for w in self._waiters) 不是,因为任务 B 在队列中有一个活动的、未取消的未来。因此,任务 C 将自己的服务员未来添加到队列中。 _waiters 队列中具有活动期货的未锁定锁实际上被视为已锁定。