【问题标题】:Python multiprocessing Pool / Process has inconsistent resultsPython多处理池/进程结果不一致
【发布时间】:2017-03-25 02:27:44
【问题描述】:

我的共享字典对象的条目数不一致。它应该有 500,但大多数测试最终都在 450 和 465 之间。我也尝试过使用 mapProcess 而不是 apply_async

map 稍微好一点,因为共享字典有大约 480 个条目,而不是大约 450 个,但它仍然不一致,并且不是预期的全部 500 个。

我也尝试过使用 Process,但这导致我的共享字典中的条目数量最少 - 大约 420 个。

这是使用apply_async的完整代码:

import numpy as np
from PIL import Image
from os import listdir
from multiprocessing import Manager, Pool

def processImage(path, d):
  image = np.array(Image.open(source + "/" + path))

  # Copy lists from shared dictionary since updates don't work otherwise
  w = d["width"]
  h = d["height"]
  w.append(image.shape[0])
  h.append(image.shape[1])
  d["width"] = w
  d["height"] = h

if __name__ == "__main__":
  source = "./sample/images"
  p = Pool()
  m = Manager()
  d = m.dict()
  d["width"], d["height"] = [], []

  for path in listdir(source):
    p.apply_async(processImage, (path, d))
  p.close()
  p.join()

这是使用map的完整代码:

def processImage(obj):
  image = np.array(Image.open(source + "/" + obj[1]))

  w = obj[0]["width"]
  h = obj[0]["height"]
  w.append(image.shape[0])
  h.append(image.shape[1])
  obj[0]["width"] = w
  obj[0]["height"] = h

if __name__ == "__main__":
  source = "./sample/images"
  p = Pool()
  m = Manager()
  d = m.dict()
  d["width"], d["height"] = [], []

  p.map(processImage, zip(itertools.repeat(d), listdir(source)))

这是使用Process的完整代码:

def processImage(path, d):
  image = np.array(Image.open(source + "/" + path))

  w = d["width"]
  h = d["height"]
  w.append(image.shape[0])
  h.append(image.shape[1])
  d["width"] = w
  d["height"] = h

if __name__ == "__main__":
  source = "./sample/images"
  p = Pool()
  m = Manager()
  d = m.dict()
  d["width"], d["height"] = [], []

  jobs = []
  for img in listdir(source):
    p = Process(target=processImage, args=(img, d))
    p.start()
    jobs.append(p)

  for j in jobs:
    j.join()

【问题讨论】:

    标签: python multiprocessing pool


    【解决方案1】:

    这是竞争条件的经典示例。您需要某种同步原语来更新d

    考虑以下情况:有两个线程(在您的情况下是子进程)执行processImage。第一个得到wh,第二个得到wh。首先将一些内容附加到两者并将其放回d。第二个对它自己的wh 做了一些事情,这不再考虑第一个线程所做的更改,并将其放回d。此时,第一个线程所做的更改将丢失。

    要解决这个问题,您需要保护使用 d 的部分代码:

    from multiprocessing import Manager, Pool, Lock
    ...
    lock = Lock()
    ...
    def processImage(path, d):
        image = np.array(Image.open(source + "/" + path))
    
        lock.acquire()
        d["width"].append(image.shape[0])
        d["height"].append(image.shape[1])
        lock.release()
    

    【讨论】:

    • 感谢您的解释——这是有道理的,但不幸的是,并没有解决我的问题:/ imgur.com/a/2eWJ8
    • 忽略我上面的评论,将锁定获取和释放移动到复制列表之前和之后解决了问题!
    猜你喜欢
    • 2013-12-01
    • 2014-08-25
    • 1970-01-01
    • 2022-01-08
    • 1970-01-01
    • 2022-11-30
    • 2015-01-13
    • 1970-01-01
    • 2019-11-10
    相关资源
    最近更新 更多