【问题标题】:Combining itertools and multiprocessing?结合 itertools 和多处理?
【发布时间】:2011-09-05 10:05:03
【问题描述】:

我有一个256x256x256 Numpy 数组,其中每个元素都是一个矩阵。我需要对这些矩阵中的每一个进行一些计算,并且我想使用multiprocessing 模块来加快速度。

这些计算的结果必须和原来的一样存储在256x256x256数组中,这样原数组中元素[i,j,k]处的矩阵的结果必须放在新数组的[i,j,k]元素中数组。

为此,我想创建一个列表,该列表可以以伪方式编写为[array[i,j,k], (i, j, k)],并将其传递给要“多处理”的函数。 假设matrices 是从原始数组中提取的所有矩阵的列表,myfunc 是进行计算的函数,代码看起来有点像这样:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

但是,map_async 似乎实际上首先创建了这个巨大的finput-list:我的 CPU 并没有做太多事情,但是内存和交换在几秒钟内就被完全消耗掉了,这显然不是什么我要。

有没有办法将这个巨大的列表传递给一个多处理函数,而无需先显式创建它? 或者你知道解决这个问题的另一种方法吗?

非常感谢! :-)

【问题讨论】:

  • 由于您在map_async() 上使用get(),您可能不想要异步 操作,而应该使用Pool.map()
  • 也许我没有正确理解这个问题,但是你考虑过 imap 还是 imap_unordered?

标签: python multiprocessing itertools


【解决方案1】:

所有multiprocessing.Pool.map* 方法在调用函数后立即完全使用迭代器(demo code)。要一次给迭代器的映射函数块提供一个块,请使用grouper_nofill

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

PS。 pool.map_asyncchunksize 参数做了一些不同的事情:它将可迭代对象分成块,然后将每个块提供给调用 map(func,chunk) 的工作进程。如果func(item) 完成得太快,这可以为工作进程提供更多数据来咀嚼,但这对您的情况没有帮助,因为在发出map_async 调用后迭代器仍会立即被完全消耗。

【讨论】:

  • 非常感谢!您的解决方案似乎确实有效!作为参考,我不得不使用 pool.map_async(myfunc, finput).get(999999),但它有效!但是,它仍然使用大量内存(当然取决于确切的块大小),并且 python 在运行期间似乎没有进行垃圾收集。任何想法为什么会这样?
  • @digitaldingo:嗯,什么都没想到。如果您可以将代码缩减为 SSCCE 并发布在此处,那将是理想的选择。
【解决方案2】:

我也遇到了这个问题。而不是这个:

res = p.map(func, combinations(arr, select_n))

res = p.imap(func, combinations(arr, select_n))

imap 不消耗它!

【讨论】:

    【解决方案3】:

    Pool.map_async() 需要知道迭代的长度才能将工作分派给多个工作人员。由于izip 没有__len__,它首先将iterable 转换为列表,导致您正在经历巨大的内存使用。

    您可以尝试通过使用__len__ 创建自己的izip 样式迭代器来回避这个问题。

    【讨论】:

    • 为什么需要知道这些?为什么它不能简单地喂饱所有空闲的工人和等待?
    • @andrew - map_async() (multiprocessing/pool.py) 中的第一行实际上是 if not hasattr(iterable, '__len__'): iterable = list(iterable)。它需要知道长度以创建足够大的输出列表,因为工人的完成顺序是未知的。
    • 嗯。它可以动态构建,不是吗?我只是在想这可能会作为一个问题提出。这似乎是一个有效的请求。
    • 是的,没有__len__ 也可以,但这会非常复杂。如果结果#321 在#23 之前准备好,它应该存储在哪里?如果长度已知,这将变得更容易。
    • 这确实很有趣...Pool.map_async() 可能不知道长度,但我知道 (256^3) --- 是否可以明确告诉它长度?如果没有,也许它应该......
    猜你喜欢
    • 2018-12-11
    • 1970-01-01
    • 2019-11-18
    • 1970-01-01
    • 2023-03-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-05-18
    相关资源
    最近更新 更多