【发布时间】:2015-02-10 16:18:32
【问题描述】:
作为 Python 新的 asyncio 模块的实验,我创建了以下 sn-p 以在后台工作程序中处理一组长时间运行的操作(作业)。
为了控制同时运行的作业的数量,我在 with 块 中引入了一个信号量(第 56 行)。但是,使用信号量,似乎永远不会释放获取的锁,因为完成后(执行回调)等待的作业不会启动。当我放弃 with 块 时,一切都按预期工作。
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()
谁能帮我解释一下这种行为?为什么清除with块时信号量不增加?
【问题讨论】:
-
我没有看到您的担忧:您的代码示例中的信号量计数按预期减少和增加。
-
你运行代码了吗?如果我这样做,前两个作业的回调将被执行,但循环中的其他协程不会开始执行。我假设他们仍在等待从信号量获取锁,但我在这里可能错了。也许他们在等待别的东西?
标签: multithreading semaphore coroutine event-loop python-asyncio