【问题标题】:Python3 multiprocessing shared objectPython3 多处理共享对象
【发布时间】:2023-04-03 04:54:02
【问题描述】:

在 Python 3.2.3(在 Debian 7.5 上)中使用 multiprocessing 模块时,我偶然发现了共享对象的同步问题。我把这个简单的例子放在一起来说明问题,它的功能类似于multiprocessing.Pool.map(我能想到的最简单的)。我正在使用multiprocessing.Manager,因为我的原始代码使用它(通过网络同步)。但是如果我使用简单的multiprocessing.Value 作为计数器变量,行为是相同的。

import os as os
import sys as sys
import multiprocessing as mp

def mp_map(function, obj_list, num_workers):
    """ 
    """
    mang = mp.Manager()
    jobq = mang.Queue()
    resq = mang.Queue()
    counter = mp.Value('i', num_workers, lock=True)
    finished = mang.Event()
    processes = []
    try:
        for i in range(num_workers):
            p = mp.Process(target=_parallel_execute, kwargs={'execfun':function, 'jobq':jobq, 'resq':resq, 'counter':counter, 'finished':finished})
            p.start()
            p.join(0)
            processes.append(p)
        for item in obj_list:
            jobq.put(item)
        for i in range(len(processes)):
            jobq.put('SENTINEL')
        finished.wait()
        for p in processes:
            if p.is_alive():
                p.join(1)
                p.terminate()
    except Exception as e:
        for p in processes:
            p.terminate()
        raise e
    results = []
    for item in iter(resq.get, 'DONE'):
        results.append(item)
    return results

def _parallel_execute(execfun, jobq, resq, counter, finished):
    """
    """
    for item in iter(jobq.get, 'SENTINEL'):
        item = execfun(item)
        resq.put(item)
    counter.value -= 1
    print('C: {}'.format(counter.value))
    if counter.value <= 0:
        resq.put('DONE')
        finished.set()
    return


if __name__ == '__main__':
    l = list(range(50))
    l = mp_map(id, l, 2)
    print('done')
    sys.exit(0)

多次运行上述代码会导致以下结果:

wks:~$ python3 mpmap.py 
C: 1
C: 0
done
wks:~$ python3 mpmap.py 
C: 1
C: 0
done
wks:~$ python3 mpmap.py 
C: 1
C: 1
Traceback (most recent call last):
  File "mpmap.py", line 55, in <module>
    l = mp_map(id, l, 2)
  File "mpmap.py", line 25, in mp_map
    finished.wait()
  File "/usr/lib/python3.2/multiprocessing/managers.py", line 1013, in wait
    return self._callmethod('wait', (timeout,))
  File "/usr/lib/python3.2/multiprocessing/managers.py", line 762, in _callmethod
    kind, result = conn.recv()
KeyboardInterrupt

根据multiprocessing 模块的文档,我不明白为什么counter 不是进程安全的,因为对它的访问是通过Manager 管理的,并且它显然是用lock=True 初始化的。由于死锁只是偶尔发生,我不确定如何解释这种行为。非常感谢任何有用的见解,谢谢。

编辑: 碰巧我在谷歌搜索后找到了一个解释;如果其他人有兴趣,我将在这里分享:基于下面链接的此博客条目1,在 Python 中完成的锁定(即在 multiprocessing.[Manager].Valuelock=True 中)不会导致对共享值的原子操作,如例子。解决方案是在进程之间使用另一个共享锁,用于控制对共享对象的访问。

[http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/]

【问题讨论】:

  • counter.value -= 1 不是原子的,尽管lock=True。看到这个答案:stackoverflow.com/a/1233363/3826372
  • 感谢您的评论和链接;但是,通过谷歌搜索找到了答案(请参阅我的编辑)
  • 是的,我知道。您在我发表评论的同时进行了编辑。在任何情况下,您都不应该通过编辑问题来回答自己的问题。你应该发布一个答案并接受它。

标签: python multiprocessing shared-memory python-3.2


【解决方案1】:

按照罗斯的建议,我在这里重复答案: 简而言之,lock=True 用于 multiprocessing.Valuemultiprocessing.Manager.Value 不会使例如一个原子操作的值的递增(递减)——需要一个单独的锁来封装整个操作;有关代码示例,请参阅此答案 https://stackoverflow.com/a/1233363/3826372 或上述博客条目 http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

【讨论】:

    猜你喜欢
    • 2017-02-07
    • 2012-05-30
    • 2020-11-13
    • 2014-01-24
    • 1970-01-01
    • 2011-04-12
    • 2011-07-27
    • 1970-01-01
    相关资源
    最近更新 更多