【问题标题】:python multiprocessing hanging, potential queue memory error?python多处理挂起,潜在的队列内存错误?
【发布时间】:2012-12-30 01:33:54
【问题描述】:

我最近发布了一个问题Using multiprocessing for finding network paths,很高兴@unutbu 提供了一个简洁的解决方案

然而,我在执行test_workers()(使用多处理)函数时遇到了困难。代码运行,但在我的网络G 中有大量节点N 挂起

使用 Mac OS X Lion 10.7.5 -- python 2.7 运行,当 N>500 时挂起。 logging 带来以下消息,之后它会挂起

[DEBUG/MainProcess] doing self._thread.start()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()

通过 VMware fusion 在 Windows 7 上运行有助于更大的网络,但最终会在 N> 20,000 个节点周围出现图表(理想情况下,我希望在 N = 500,000 的网络上使用它)。悬挂点来自窗户一侧的消息:

[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()[DEBUG/MainProcess] telling queue thread to quit
Traceback (most recent call last):
      File "C:\Users\Scott\Desktop\fp_test.py", line 75, in <module>
    Traceback (most recent call last):
          File "C:\Python27\lib\multiprocessing\queues.py", line 264, in _feed
    test_workers()
    MemoryError

我想知道是否有人对为什么会发生这种情况有任何想法?以及是否有任何关于如何使其适用于更大网络的建议?

非常感谢您提出的任何建议。

@unutbu 的代码:

import networkx as nx
import multiprocessing as mp
import random
import sys
import itertools as IT
import logging
logger = mp.log_to_stderr(logging.DEBUG)


def worker(inqueue, output):
    result = []
    count = 0
    for pair in iter(inqueue.get, sentinel):
        source, target = pair
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))
    output.put(result)

def test_workers():
    result = []
    inqueue = mp.Queue()
    for source, target in IT.product(sources, targets):
        inqueue.put((source, target))
    procs = [mp.Process(target = worker, args = (inqueue, output))
             for i in range(mp.cpu_count())]
    for proc in procs:
        proc.daemon = True
        proc.start()
    for proc in procs:    
        inqueue.put(sentinel)
    for proc in procs:
        result.extend(output.get())
    for proc in procs:
        proc.join()
    return result

def test_single_worker():
    result = []
    count = 0
    for source, target in IT.product(sources, targets):
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))

    return result

sentinel = None

seed = 1
m = 1
N = 1340//m
G = nx.gnm_random_graph(N, int(1.7*N), seed)
random.seed(seed)
sources = [random.randrange(N) for i in range(340//m)]
targets = [random.randrange(N) for i in range(1000//m)]
output = mp.Queue()

if __name__ == '__main__':
    test_workers()
    # test_single_worker()
    # assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker()))

【问题讨论】:

    标签: python queue multiprocessing


    【解决方案1】:

    您遇到了logging 模块的死锁。

    该模块保留了一些线程锁以允许跨线程进行安全日志记录,但是当当前进程被分叉时它不能很好地发挥作用。例如,请参阅 here 以了解正在发生的事情。

    解决方案是删除logging 调用或改用普通的prints。

    无论如何,作为一般规则,避免使用线程+分叉。并始终检查哪些模块在幕后使用线程。

    请注意,在 windows 上它可以正常工作,因为 windows 没有fork,因此不会出现锁定克隆和后续死锁的问题。 在这种情况下,MemoryError 表示该进程正在消耗过多的 RAM。 您可能需要重新考虑算法以使用更少的 RAM,但这与您在 OSX 上遇到的问题完全不同

    【讨论】:

    • 删除日志调用后,我仍然发现代码挂在 OSX 上。你认为任何其他模块保持线程锁定? (我怎么能发现呢?) - 另外;你知道我使用的队列的大小是否有限制吗?
    • @scott_ouce 搜索了一下,我在multiprocessing 跟踪的问题中发现了这个issue。它可能与您的问题有关,因为它与 MacOSX 和 mp.Queue 有关。无论如何,这对我来说似乎真的很奇怪,因为我在您的代码中看不到可能存在弱引用问题。
    猜你喜欢
    • 1970-01-01
    • 2021-04-29
    • 2014-10-15
    • 2012-04-03
    • 1970-01-01
    • 1970-01-01
    • 2016-01-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多