【发布时间】:2014-01-24 18:55:53
【问题描述】:
我编写了一个蜘蛛,它从列表中获取 url,使用 requests 在单独的线程中使用 concurrent.futures.ThreadPoolExecutor 加载相应的页面,当加载页面时,从中提取一些信息,放入 item (字典),然后将 item 放入名为 collected_items 的 Queue() 中。
在运行蜘蛛方法后,在单独的线程中为ThreadPoolExecutor 创建作业(简化):
def start_requests(self):
def start_requests():
for url in self.start_urls:
self.start_request(url)
self._executor = ThreadPoolExecutor(self.max_workers)
self._executor.submit(start_requests)
我正在等待工作线程收集的项目:
spider = Spider()
spider.start_requests()
while not spider._executor._work_queue.empty() or not collected_items.empty():
try:
item = collected_items.get(timeout=0.25)
except queue.Empty:
continue
print('Found an item %s' item)
但有时while 循环会在所有项目被收集之前中断。
spider._executor._threads 是工作线程的set,在while 循环中从spider._executor._work_queue 获取工作项并运行相关的可调用对象。
条件not spider._executor._work_queue.empty() or not collected_items.empty() 不可靠,因为执行程序中的工作项队列以及收集的项队列可能为空,但在检查此条件时,执行程序工作线程可能已从@987654339 获取最后一个工作项@ 并且现在正在做一些工作,将收集的项目添加到 collected_items 队列(目前也是空的)。或者工作项队列还没有收到第一个工作项。
我没有找到可靠的方法来确定我是否仍需要等待新项目出现在 collected_items 中或继续前进。
更新:
如果在完成一个工作项后工作线程会调用work_queue.task_done(),我会解决这个问题。不幸的是it's not the case。
我已对相关错误添加了评论:http://bugs.python.org/issue14119#msg207512
【问题讨论】:
-
似乎是个棘手的问题。顺便说一句,你把
def Spider(self):和def start_requests(self):打错了吗? -
@WKPlus,不,这不是错字。我已经完成了一个闭包以在单独的工作线程中运行内部
start_requests。
标签: python multithreading python-3.x threadpool