【问题标题】:Using defaultdict with multiprocessing?将 defaultdict 与多处理一起使用?
【发布时间】:2012-03-04 15:05:12
【问题描述】:

只是实验和学习,我知道如何创建一个可以通过多个进程访问的共享字典,但我不确定如何保持字典同步。我相信defaultdict 说明了我遇到的问题。

from collections import defaultdict
from multiprocessing import Pool, Manager, Process

#test without multiprocessing
s = 'mississippi'
d = defaultdict(int)
for k in s:
    d[k] += 1

print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)]
print '*'*10, ' with multiprocessing ', '*'*10

def test(k, multi_dict):
    multi_dict[k] += 1

if __name__ == '__main__':
    pool = Pool(processes=4)
    mgr = Manager()
    multi_d = mgr.dict()
    for k in s:
        pool.apply_async(test, (k, multi_d))

    # Mark pool as closed -- no more tasks can be added.
    pool.close()

    # Wait for tasks to exit
    pool.join()

    # Output results
    print multi_d.items()  #FAIL

print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10
def test2(k, multi_dict2):
    multi_dict2[k] += 1


if __name__ == '__main__':
    manager = Manager()

    multi_d2 = manager.dict()
    for k in s:
        p = Process(target=test2, args=(k, multi_d2))
    p.start()
    p.join()

    print multi_d2 #FAIL

第一个结果有效(因为它没有使用multiprocessing),但我无法让它与multiprocessing 一起使用。我不知道如何解决它,但我认为可能是因为它没有被同步(并在稍后加入结果)或者可能是因为在 multiprocessing 内我无法弄清楚如何将 defaultdict(int) 设置为字典。

任何有关如何使其工作的帮助或建议都会很棒!

【问题讨论】:

    标签: python multiprocessing defaultdict


    【解决方案1】:

    您可以继承BaseManager 并注册其他类型以进行共享。在默认的AutoProxy-generated 类型不起作用的情况下,您需要提供合适的代理类型。对于defaultdict,如果只需要访问dict中已经存在的属性,可以使用DictProxy

    from multiprocessing import Pool
    from multiprocessing.managers import BaseManager, DictProxy
    from collections import defaultdict
    
    class MyManager(BaseManager):
        pass
    
    MyManager.register('defaultdict', defaultdict, DictProxy)
    
    def test(k, multi_dict):
        multi_dict[k] += 1
    
    if __name__ == '__main__':
        pool = Pool(processes=4)
        mgr = MyManager()
        mgr.start()
        multi_d = mgr.defaultdict(int)
        for k in 'mississippi':
            pool.apply_async(test, (k, multi_d))
        pool.close()
        pool.join()
        print multi_d.items()
    

    【讨论】:

    • 哇,成功了,谢谢。我不太明白你的修改,MyManager(BaseManager) 类的目的是什么?
    • @Lostsoul 是the documented way 添加对共享其他类型的支持,而不是 Manager 支持的类型。
    • @JanneKarila 你知道我在哪里可以找到所有代理类型的列表吗?
    • 很惊讶原来的 manager 类不支持这些集合,但非常感谢让我们知道这是可能的!
    【解决方案2】:

    好吧,Manager 类似乎只提供了固定数量的预定义数据结构,这些数据结构可以在进程之间共享,而defaultdict 不在其中。如果你真的只需要那个defaultdict,最简单的解决方案是自己实现默认行为:

    def test(k, multi_dict):
        if k not in multi_dict:
            multi_dict[k] = 0
        multi_dict[k] += 1
    

    【讨论】:

      猜你喜欢
      • 2021-05-22
      • 1970-01-01
      • 2018-03-10
      • 2023-02-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-16
      • 2022-01-18
      相关资源
      最近更新 更多