【发布时间】:2019-04-20 02:10:01
【问题描述】:
我有一个函数必须遍历图像的各个像素并计算一些几何图形。此功能需要很长时间才能运行(在 24 兆像素图像上约 5 小时),但似乎应该很容易在多个内核上并行运行。但是,我终其一生都找不到一个有据可查、解释清楚的使用 Multiprocessing 包执行此类操作的示例。这是我现在作为玩具示例运行的代码:
import numpy as np
import matplotlib.pyplot as plt
from scipy import misc
from skimage import color
import multiprocessing
from multiprocessing import Process
#Some dumb stand in function for this exercise
def dumb_func(image):
ny, nx = image.shape
temp = np.empty_like(image)
for y in range(ny):
for x in range(nx):
temp[y, x] = np.square(image[y, x])
return temp
#Convert image to greyscale
img = color.rgb2gray(misc.ascent())
#Resize the image
ns = 2048 #Pixel size
img = misc.imresize(img, size = (ns, ns))
#Split the image into equal chunks...not sure how this works for arrays that
#are weird shapes and aren't the same size in each dimension
divs = 4
init_split = np.array_split(img, divs, axis = 0)
side = init_split[0].shape[0]
chunked = np.empty((divs, divs, side, side))
cur = 0
for i in range(divs):
split = np.array_split(init_split[i], divs, axis = 1)
for j in range(divs):
chunked[i, j, :, :] = split[j]
cur +=1
#Pull core count and divide by two to be safe
cores = int(multiprocessing.cpu_count() / 2)
result = np.empty_like(chunked)
idxs = np.array(np.meshgrid(np.arange(0, divs, 1),
np.arange(0, divs, 1))).T.reshape(-1, 2)
基本上,这段代码加载到图像中,将其转换为灰度,使其变大,然后将其分块。分块数组的形状为 (i, j, ny, nx),其中 i 和 j 是标识我正在使用的图像块的索引,而 ny,nx 描述每个块的像素大小。
此外,我正在创建一个名为 idxs 的数组,它将所有可能的索引存储到分块数组中,以将分块图像拉出。
我想要做的是在块上并行运行一个函数(在本例中以dumb_func为例)并将结果存储在相同形状的结果数组中。我想象的方法是遍历 idxs 数组并分配处理属于这些索引的块,直到核心数,等待这些核心完成,然后为核心提供更多进程直到完成。我被卡住了,因为我无法 A) 弄清楚如何访问函数中的返回值,以及 B) 如何处理我可能有 16 个块和 5 个内核导致最后一次迭代只需要一个进程的情况。
我该怎么做呢?在过去的 6 到 7 个小时里,我一直在阅读有关多处理池、进程、地图、星图等方面的信息……但我一生都无法理解如何实现这一点。
为 Reedinationer 编辑:
这是我更新的代码,运行时没有错误。但是 new_data 数组永远不会更新。我用值 100 填充它,并且在例程 new_data 的末尾正是它的初始化方式。
import numpy as np
import matplotlib.pyplot as plt
from scipy import misc
from multiprocessing import Process, JoinableQueue
from time import time
#SOme dumb stand in function for this exercise
def dumb_func(q, new_data):
while True:
index, image = q.get()
temp = image **2
new_data[index[0], index[1], :, :] = temp
q.task_done()
if __name__ == "__main__":
start = time()
q = JoinableQueue()
img = misc.ascent()
#Resize the image
ns = 2048 #Pixel size
img = misc.imresize(img, size = (ns, ns))
#Split the image into equal chunks...not sure how this works for arrays that
#are weird shapes and aren't the same size in each dimension
divs = 4
init_split = np.array_split(img, divs, axis = 0)
side = init_split[0].shape[0]
chunked = np.empty((divs, divs, side, side))
cur = 0
for i in range(divs):
split = np.array_split(init_split[i], divs, axis = 1)
for j in range(divs):
chunked[i, j, :, :] = split[j]
cur +=1
new_data = np.full(chunked.shape, 100)
idxs = np.array(np.meshgrid(np.arange(0, divs, 1),
np.arange(0, divs, 1))).T.reshape(-1, 2)
for i in range(len(idxs)):
q.put((idxs[i], chunked[idxs[i][0], idxs[i][1], :, :]))
print ('starting workers')
worker_count = len(idxs)
processes = []
for i in range(worker_count):
p = Process(target=dumb_func, args=[q, new_data])
p.daemon = True
p.start()
print('main thread waiting')
q.join()
end = time()
print('{:.3f} seconds elapsed'.format(end - start))
【问题讨论】:
-
请澄清您的示例,以显示您想如何以及在何处致电
dumb_func。通常,您需要一个multiprocessing.Pool(),然后是map您对该池的调用。
标签: python python-3.x multiprocessing