【问题标题】:Multiprocessing - shared memory with multidimensional numpy array多处理 - 具有多维 numpy 数组的共享内存
【发布时间】:2018-10-20 13:46:12
【问题描述】:

我需要并行处理一个非常大的 numpy 数组 (55x117x256x256)。尝试使用通常的多处理方法传递它会产生 AssertionError,我理解这是因为数组太大而无法复制到每个进程中。因此,我想尝试将共享内存与多处理一起使用。 (我对其他方法持开放态度,只要它们不太复杂)。

我看到一些关于使用 python 多处理的共享内存方法的问题,例如

import numpy as np
import multiprocessing as mp

unsharedData = np.zeros((10,))
sharedData = mp.Array('d', unsharedData)

这似乎工作正常。但是,我还没有看到使用多维数组完成此操作的示例。

我尝试将多维数组粘贴到mp.Array 中,这给了我TypeError: only size-1 arrays can be converted to Python scalars

unsharedData2 = np.zeros((10,10))
sharedData2 = mp.Array('d', unsharedData2)## Gives TypeError

我可以展平阵列,但如果可以避免,我宁愿不这样做。

是否有一些技巧可以让 multiprocessing Array 处理多维数据?

【问题讨论】:

标签: python numpy python-multiprocessing


【解决方案1】:

虽然已经给出了多处理的答案,但存在使用ray 的替代方案,这是一个替代的多处理框架。

使用ray,您可以使用obj_ref = ray.put(obj) 将任何对象放入只读共享内存中。好消息是 ray 内置了对从共享内存中零拷贝检索 numpy 数组的支持。

使用共享内存的光线实现会有一点开销,但考虑到数组如此之大,这可能不会成为问题。

import numpy as np
import ray

@ray.remote
def function(arr, num: int):
    # array is automatically retrieved if a reference is passed to
    # a remote function, you could do this manually with ray.get(ref)
    return arr.mean() + num

if __name__ == '__main__':
    ray.init()
    # generate array and place into shared memory, return reference
    array_ref = ray.put(np.random.randn(55, 117, 256, 256))
    # multiple processes operating on shared array
    results = ray.get([function.remote(array_ref, i) for i in range(8)])
    print(results)

【讨论】:

    【解决方案2】:

    您可以使用与Array 关联的get_obj() 方法在共享相同内存的每个进程中创建一个新的多维numpy 数组,该方法返回呈现缓冲区接口的ctypes 数组。

    请看下面的例子:

    import ctypes as c
    import numpy as np
    import multiprocessing as mp
    
    
    unsharedData2 = np.zeros((10, 10))
    n, m = unsharedData2.shape[0], unsharedData2.shape[1]
    
    
    def f1(mp_arr):
        #in each new process create a new numpy array as follows:
        arr = np.frombuffer(mp_arr.get_obj())
        b = arr.reshape((n, m))# mp_arr arr and b share the same memory
        b[2][1] = 3
    
    
    def f2(mp_arr):
        #in each new process create a new numpy array as follows:
        arr = np.frombuffer(mp_arr.get_obj())
        b = arr.reshape((n, m)) # mp_arr arr and b share the same memory
        b[1][1] = 2
    
    
    if __name__ == '__main__':
        mp_arr = mp.Array(c.c_double, n*m)
        p = mp.Process(target=f1, args=(mp_arr,))
        q = mp.Process(target=f2, args=(mp_arr,))
        p.start()
        q.start()
        p.join()
        q.join()
        arr = np.frombuffer(mp_arr.get_obj())
        b = arr.reshape((10, 10))
        print(b)
        '''
        [[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
         [0. 2. 0. 0. 0. 0. 0. 0. 0. 0.]
         [0. 3. 0. 0. 0. 0. 0. 0. 0. 0.]
         [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
         [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
         [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
         [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
         [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
         [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
         [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
        '''
    

    【讨论】:

      【解决方案3】:

      您可以使用 np.reshape((-1,))np.ravel 而不是 np.flatten 来制作数组的一维 view,而无需进行 flatten 所做的不必要复制:

      import numpy as np
      import multiprocessing as mp
      
      unsharedData2 = np.zeros((10, 10))
      ravel_copy = np.ravel(unsharedData2)
      reshape_copy2 = unsharedData2.reshape((-1,))
      ravel_copy[11] = 1.0       # -> saves 1.0 in unsharedData2 at [1, 1]
      reshape_copy2[22] = 2.0    # -> saves 2.0 in unsharedData2 at [2, 2]
      sharedData2 = mp.Array('d', ravel_copy)
      sharedData2 = mp.Array('d', reshape_copy2)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-01-02
        • 2023-03-15
        • 2011-12-15
        • 2021-03-19
        • 1970-01-01
        • 1970-01-01
        • 2013-09-06
        相关资源
        最近更新 更多