【问题标题】:Determine if worker threads are doing any work确定工作线程是否正在做任何工作
【发布时间】:2014-01-24 18:55:53
【问题描述】:

我编写了一个蜘蛛,它从列表中获取 url,使用 requests 在单独的线程中使用 concurrent.futures.ThreadPoolExecutor 加载相应的页面,当加载页面时,从中提取一些信息,放入 item (字典),然后将 item 放入名为 collected_itemsQueue() 中。

在运行蜘蛛方法后,在单独的线程中为ThreadPoolExecutor 创建作业(简化):

def start_requests(self):

    def start_requests():
        for url in self.start_urls:
            self.start_request(url)

    self._executor = ThreadPoolExecutor(self.max_workers)
    self._executor.submit(start_requests)

我正在等待工作线程收集的项目:

spider = Spider()
spider.start_requests()

while not spider._executor._work_queue.empty() or not collected_items.empty():
    try:
        item = collected_items.get(timeout=0.25)
    except queue.Empty:
        continue
    print('Found an item %s' item)

但有时while 循环会在所有项目被收集之前中断。

spider._executor._threads 是工作线程的set,在while 循环中从spider._executor._work_queue 获取工作项并运行相关的可调用对象。

条件not spider._executor._work_queue.empty() or not collected_items.empty() 不可靠,因为执行程序中的工作项队列以及收集的项队列可能为空,但在检查此条件时,执行程序工作线程可能已从@987654339 获取最后一个工作项@ 并且现在正在做一些工作,将收集的项目添加到 collected_items 队列(目前也是空的)。或者工作项队列还没有收到第一个工作项。

我没有找到可靠的方法来确定我是否仍需要等待新项目出现在 collected_items 中或继续前进。

更新:

如果在完成一个工作项后工作线程会调用work_queue.task_done(),我会解决这个问题。不幸的是it's not the case

我已对相关错误添加了评论:http://bugs.python.org/issue14119#msg207512

【问题讨论】:

  • 似乎是个棘手的问题。顺便说一句,你把def Spider(self):def start_requests(self): 打错了吗?
  • @WKPlus,不,这不是错字。我已经完成了一个闭包以在单独的工作线程中运行内部 start_requests

标签: python multithreading python-3.x threadpool


【解决方案1】:

像这样编写你的工作代码:

def run():
    while True:
        item = work_queue.get()
        work(item)
        work_queue.task_done()

并使用queue.unfinished_tasks作为条件。

【讨论】:

  • 感谢您的评论。翻看concurrent/futures/thread.py的源码我也想到了这个。但我必须修改标准库代码或复制它。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-05-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-08-08
相关资源
最近更新 更多