【问题标题】:numpy vs. multiprocessing and mmapnumpy 与多处理和 mmap
【发布时间】:2012-04-01 13:38:14
【问题描述】:

我正在使用 Python 的 multiprocessing 模块来并行处理大型 numpy 数组。这些数组在主进程中使用numpy.load(mmap_mode='r') 进行内存映射。之后,multiprocessing.Pool() 分叉了这个进程(我猜)。

一切似乎都运行良好,除了我得到如下行:

AttributeError("'NoneType' object has no attribute 'tell'",)
  in `<bound method memmap.__del__ of
       memmap([ 0.57735026,  0.57735026,  0.57735026,  0.        ,  0.        ,        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,        0.        ,  0.        ], dtype=float32)>`
     ignored

在单元测试日志中。尽管如此,测试还是通过了。

知道那里发生了什么吗?

使用 Python 2.7.2、OS X、NumPy 1.6.1。


更新:

经过一些调试,我找到了一个代码路径的原因,该路径使用这个内存映射的 numpy 数组的(一小部分)作为Pool.imap 调用的输入。

显然,“问题”在于multiprocessing.Pool.imap 将其输入传递给新进程的方式:它使用pickle。这不适用于mmaped numpy 数组,并且内部中断会导致错误。

我发现 Robert Kern 的 this reply 似乎解决了同样的问题。他建议为imap 输入来自内存映射数组时创建一个特殊的代码路径:在生成的进程中手动内存映射相同的数组。

这将是如此复杂和丑陋,以至于我宁愿忍受错误和额外的内存副本。有没有其他方法可以更轻松地修改现有代码?

【问题讨论】:

    标签: python numpy multiprocessing mmap


    【解决方案1】:

    我通常的方法(如果你可以忍受额外的内存副本)是在一个进程中完成所有 IO,然后将事情发送到工作线程池。要将一个 memmapped 数组的切片加载到内存中,只需执行 x = np.array(data[yourslice])data[yourslice].copy() 实际上并没有这样做,这可能会导致一些混乱。)。

    首先,让我们生成一些测试数据:

    import numpy as np
    np.random.random(10000).tofile('data.dat')
    

    您可以通过以下方式重现您的错误:

    import numpy as np
    import multiprocessing
    
    def main():
        data = np.memmap('data.dat', dtype=np.float, mode='r')
        pool = multiprocessing.Pool()
        results = pool.imap(calculation, chunks(data))
        results = np.fromiter(results, dtype=np.float)
    
    def chunks(data, chunksize=100):
        """Overly-simple chunker..."""
        intervals = range(0, data.size, chunksize) + [None]
        for start, stop in zip(intervals[:-1], intervals[1:]):
            yield data[start:stop]
    
    def calculation(chunk):
        """Dummy calculation."""
        return chunk.mean() - chunk.std()
    
    if __name__ == '__main__':
        main()
    

    如果你只是切换到产生np.array(data[start:stop]),你会解决这个问题:

    import numpy as np
    import multiprocessing
    
    def main():
        data = np.memmap('data.dat', dtype=np.float, mode='r')
        pool = multiprocessing.Pool()
        results = pool.imap(calculation, chunks(data))
        results = np.fromiter(results, dtype=np.float)
    
    def chunks(data, chunksize=100):
        """Overly-simple chunker..."""
        intervals = range(0, data.size, chunksize) + [None]
        for start, stop in zip(intervals[:-1], intervals[1:]):
            yield np.array(data[start:stop])
    
    def calculation(chunk):
        """Dummy calculation."""
        return chunk.mean() - chunk.std()
    
    if __name__ == '__main__':
        main()
    

    当然,这确实会为每个块创建一个额外的内存副本。

    从长远来看,您可能会发现从内存映射文件切换到 HDF 之类的文件会更容易。如果您的数据是多维的,则尤其如此。 (我会推荐h5py,但如果您的数据是“类似表格的”,pyTables 会很好。)

    祝你好运,无论如何!

    【讨论】:

    • 乔,你的答案总是那么棒。我一直在试图弄清楚这样的事情。
    • 感谢 HDF 提示。看起来像一个巨大的变化,但它可能是值得的,我会检查一下。
    • HDF5 在多处理环境中有类似的问题。他们有一些基于 mpi 的解决方案,但恕我直言是重量级的。检查github.com/h5py/h5py/blob/…
    猜你喜欢
    • 2021-09-02
    • 2015-05-18
    • 1970-01-01
    • 2015-08-08
    • 2012-10-24
    • 1970-01-01
    • 2016-10-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多