【问题标题】:python multicore queue randomly hangs for no reason, despite queue size being tinypython多核队列无缘无故地挂起,尽管队列大小很小
【发布时间】:2013-09-03 19:39:13
【问题描述】:

在 python 中,这是我的多处理设置。我将 Process 方法子类化并给了它 用于酸洗/数据目的的队列和其他一些字段。

这种策略在大约 95% 的时间里有效,另外 5% 的原因不明,队列只是挂起并且永远不会完成(通常 4 个核心中的 3 个完成他们的工作,最后一个需要永远,所以我有只是杀死工作)。

我知道队列在 python 中的大小是固定的,否则它们会挂起。我的队列只存储一个字符串...处理器的id,所以不可能是这样的。

这是我的代码停止的确切行:

res = self._recv() 

有人有想法吗?正式代码如下。 谢谢。

from multiprocessing import Process, Queue
from multiprocessing import cpu_count as num_cores
import codecs, cPickle

class Processor(Process):

    def __init__(self, queue, elements, process_num):
        super(Processor, self).__init__()
        self.queue = queue
        self.elements = elements
        self.id = process_num

    def job(self):

        ddd = []

        for l in self.elements:

            obj = ... heavy computation ...

            dd = {}
            dd['data'] = obj.data
            dd['meta'] = obj.meta
        ddd.append(dd)

        cPickle.dump(ddd, codecs.open(
            urljoin(TOPDIR, self.id+'.txt'), 'w'))

        return self.id

    def run(self):
        self.queue.put(self.job())



 if __name__=='__main__':

        processes = []

        for i in range(0, num_cores()):

            q = Queue()
            p = Processor(q, divided_work(), process_num=str(i))
            processes.append((p, q))
            p.start()

        for val in processes:

            val[0].join()
            key = val[1].get() 

            storage = urljoin(TOPDIR, key+'.txt')

            ddd = cPickle.load(codecs.open(storage , 'r'))

            .. unpack ddd process data ...

【问题讨论】:

  • res = self._recv() 看起来不像是错误消息..
  • 这是我的代码所在的行,用词错误
  • res = self._recv() 行不在您显示的代码中。另外,能否再发一次错误信息?
  • 没有错误信息,我的代码只是挂起。当我按 ctrl-c 时,我可以看到它挂在 res = self._recv() 上。
  • 您发布的代码中没有res = self._recv()

标签: python python-2.7 multiprocessing


【解决方案1】:

run() 方法的开头使用 time.sleep(0.001)

【讨论】:

  • 谢谢你的回答,我会做的。但是你能解释一下你的推理吗?
  • 我曾经遇到过类似的问题,它似乎也适用于this guy。不幸的是,我没有更令人满意的解释。
  • 我会回复结果,顺便说一句,这对我来说有点奇怪,因为我的工作方法需要很长时间,比如 5 分钟。你认为暂停一毫秒会改变什么吗?奇怪的!感谢您的回复
  • 我不能确定,只是尝试找出,它可能不起作用,在这种情况下需要更多调查!
  • 哎呀,它仍然挂起:\也许这是我在计算中的逻辑需要永远
【解决方案2】:

根据我的经验

time.sleep(0.001)

远远不够。

我遇到了类似的问题。如果您“过早”在队列中调用 get()put() 似乎会发生这种情况。我猜它以某种方式无法足够快地初始化。不完全确定,但我推测这可能与队列使用底层操作系统传递消息的方式有关。在我开始使用 BeautifulSoup 和 lxml 后它开始发生在我身上,它影响了完全不相关的代码。

我的解决方案有点丑,但很简单,而且很有效:

import time

def run(self):
    error = True
    while error:
        self.queue.put(self.job())
        error = False
    except EOFError:
        print "EOFError. retrying..."
        time.sleep(1)

在我的机器上,它通常在应用程序启动期间重试两次,之后再也不重试。您需要在发送方和接收方内部执行此操作,因为此错误会在双方发生。

【讨论】:

    猜你喜欢
    • 2017-12-24
    • 2019-04-26
    • 2015-02-18
    • 1970-01-01
    • 1970-01-01
    • 2016-07-19
    • 2017-10-21
    • 2014-11-05
    • 1970-01-01
    相关资源
    最近更新 更多