【问题标题】:Shared memory in multiprocessing多处理中的共享内存
【发布时间】:2012-12-16 23:16:47
【问题描述】:

我有三个大列表。第一个包含位数组(模块位数组 0.8.0),另外两个包含整数数组。

l1=[bitarray 1, bitarray 2, ... ,bitarray n]
l2=[array 1, array 2, ... , array n]
l3=[array 1, array 2, ... , array n]

这些数据结构需要相当多的 RAM(总共约 16GB)。

如果我使用以下方法启动 12 个子流程:

multiprocessing.Process(target=someFunction, args=(l1,l2,l3))

这是否意味着将为每个子进程复制 l1、l2 和 l3,还是子进程将共享这些列表?或者更直接地说,我会使用 16GB 还是 192GB 的 RAM?

someFunction 将从这些列表中读取一些值,然后根据读取的值执行一些计算。结果将返回给父进程。列表 l1、l2 和 l3 不会被 someFunction 修改。

因此,我假设子进程不需要也不会复制这些巨大的列表,而是与父进程共享它们。这意味着由于 linux 下的写时复制方法,该程序将占用 16GB 的 RAM(不管我启动了多少子进程)? 我是正确的还是我遗漏了会导致列表被复制的内容?

编辑: 在阅读了有关该主题的更多内容后,我仍然感到困惑。一方面,Linux 使用写时复制,这应该意味着没有数据被复制。另一方面,访问对象会改变它的引用计数(我仍然不确定为什么以及这意味着什么)。即便如此,是否会复制整个对象?

例如,如果我定义 someFunction 如下:

def someFunction(list1, list2, list3):
    i=random.randint(0,99999)
    print list1[i], list2[i], list3[i]

使用这个函数是否意味着l1、l2和l3将被完全复制到每个子进程?

有没有办法检查这个?

EDIT2 在阅读更多内容并在子进程运行时监控系统的总内存使用情况后,似乎确实为每个子进程复制了整个对象。这似乎是因为引用计数。

在我的程序中实际上不需要 l1、l2 和 l3 的引用计数。这是因为 l1、l2 和 l3 将保留在内存中(不变),直到父进程退出。在此之前不需要释放这些列表使用的内存。事实上,我确信引用计数将保持在 0 以上(对于这些列表和这些列表中的每个对象),直到程序退出。

所以现在问题变成了,我怎样才能确保对象不会被复制到每个子流程?我可以禁用这些列表和这些列表中的每个对象的引用计数吗?

EDIT3 只是一个附加说明。子流程不需要修改l1l2l3 或这些列表中的任何对象。子进程只需要能够引用其中一些对象,而不需要为每个子进程复制内存。

【问题讨论】:

  • stackoverflow.com/questions/10721915/… 类似的问题和你的答案。
  • 阅读它,但仍然不确定答案。是否会复制整个对象?只是对象的一部分?仅包含引用计数的页面?怎么查?
  • 由于写时复制,我认为你不应该做任何特别的事情。为什么不试试呢?
  • 试过了,列表被复制了。这似乎是因为如果我在子进程中执行 l1_0=l1[0] 那么这会增加 l1 的引用计数器。所以虽然我没有更改数据,但我已经更改了对象,这会导致内存被复制。
  • @anti666 非常感谢这篇文章/问题。我想我在引用计数等方面遇到了一些相同的问题。您是否尝试过 Numpy 数组,以至少减少可能计算引用的对象?另外,由于您没有提及您的测量方法,请确保使用smem 的 PSS 统计信息;仅查看 RSS 并没有显示任何有用的信息,因为它会重复计算共享内存。

标签: python multiprocessing shared-memory large-data


【解决方案1】:

一般来说,共享相同数据有两种方式:

  • 多线程
  • 共享内存

Python 的多线程不适合 CPU 密集型任务(因为 GIL),所以在这种情况下通常的解决方案是继续 multiprocessing。但是,使用此解决方案,您需要使用multiprocessing.Valuemultiprocessing.Array 显式共享数据。

请注意,由于所有同步问题,通常在进程之间共享数据可能不是最佳选择;涉及参与者交换消息的方法通常被视为更好的选择。另见Python documentation

如上所述,在进行并发编程时,通常是 最好尽量避免使用共享状态。这是 在使用多个进程时尤其如此。

但是,如果您确实需要使用一些共享数据,那么 多处理提供了几种方法。

在您的情况下,您需要以multiprocessing 可以理解的某种方式包装l1l2l3(例如,使用multiprocessing.Array),然后将它们作为参数传递。
另请注意,正如您所说,您不需要写访问权限,那么您应该在创建对象时传递lock=False,否则所有访问权限仍将被序列化。

【讨论】:

  • 我可以使用multiprocessing.Array 来包装任意对象的列表,例如bitarray()吗?
  • 或者,如果 bitarray 支持协议缓冲区,您可以将其作为 bytearray 共享,然后在生成的进程中将其转换回 bitarray。
  • 决定将l2l3 转换为'multiprocessing.Array' 对象的元组。希望这些对象(数据的最大部分)不会为每个子流程完全复制。这将在一定程度上缓解问题。最终的解决方案是用 C 重写程序,因为它会更快并且没有这个问题。
  • 使用共享内存,你不应该有这个问题,在 Python 中也是如此。
  • multiprocessing.Value 和 multiprocessing.Array 强制您使用原始 C 数据类型。他们确实确保内存是共享的,但这并不像问题帖子所问的仅使用 Linux 的 CoW 行为那么简单。我有一种预感,提问者的假设是引用计数正在破坏它是正确的。
【解决方案2】:

因为这在 google 上仍然是一个非常高的结果,而且还没有其他人提到它,所以我想我会提到在 python 版本 3.8.0 中引入的“真正”共享内存的新可能性:https://docs.python.org/3/library/multiprocessing.shared_memory.html

我在这里包含了一个小例子(在 linux 上测试),其中使用了 numpy 数组,这可能是一个非常常见的用例:

# one dimension of the 2d array which is shared
dim = 5000

import numpy as np
from multiprocessing import shared_memory, Process, Lock
from multiprocessing import cpu_count, current_process
import time

lock = Lock()

def add_one(shr_name):

    existing_shm = shared_memory.SharedMemory(name=shr_name)
    np_array = np.ndarray((dim, dim,), dtype=np.int64, buffer=existing_shm.buf)
    lock.acquire()
    np_array[:] = np_array[0] + 1
    lock.release()
    time.sleep(10) # pause, to see the memory usage in top
    print('added one')
    existing_shm.close()

def create_shared_block():

    a = np.ones(shape=(dim, dim), dtype=np.int64)  # Start with an existing NumPy array

    shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
    # # Now create a NumPy array backed by shared memory
    np_array = np.ndarray(a.shape, dtype=np.int64, buffer=shm.buf)
    np_array[:] = a[:]  # Copy the original data into shared memory
    return shm, np_array

if current_process().name == "MainProcess":
    print("creating shared block")
    shr, np_array = create_shared_block()

    processes = []
    for i in range(cpu_count()):
        _process = Process(target=add_one, args=(shr.name,))
        processes.append(_process)
        _process.start()

    for _process in processes:
        _process.join()

    print("Final array")
    print(np_array[:10])
    print(np_array[10:])

    shr.close()
    shr.unlink()

请注意,由于 64 位整数,此代码可能需要大约 1gb 的 ram 才能运行,因此请确保您不会在使用它时冻结您的系统。 ^_^

【讨论】:

  • 亲爱的@Rboreal_Frippery,感谢您的出色回答。我想知道是否会有另一种方法来确保生成的进程数不超过 CPU 中的内核数。类似于 multiprocessing.Pool 对象。如果有这样一种方法,如何使用 Processes 来实现它?
  • @PhilipeRiskallaLeal 流程本身并不占用整个核心。您可以拥有比 CPU 内核更多的进程...
  • 感谢您的出色回答。只是想链接到与此类似的答案,其中包括内存跟踪比较:mingze-gao.com/posts/python-shared-memory-in-multiprocessing
【解决方案3】:

对于那些对使用 Python3.8 的 shared_memory 模块感兴趣的人,它仍然有一个 bug 尚未修复,目前正在影响 Python3.8/3.9/3.10 (2021-01-15) .该错误会影响 posix 系统,并且是关于资源跟踪器在其他进程仍应具有有效访问权限时破坏共享内存段。所以在你的代码中使用它时要小心。

【讨论】:

  • 我体验到这个资源跟踪器破坏了共享内存。作为一种解决方法,我将共享内存存储在一个列表中。所以这个共享内存被链接到一个数据结构,资源跟踪器不能破坏它。我的python版本是python3.8
  • 该错误在 2022 年 1 月 1 日仍然存在,但在 POSIX 系统的错误讨论中似乎有一个猴子补丁解决方案。对于 Windows,我通过从 Lib\multiprocessing\shared_memory.py 中删除这些行(~第 152 行)来消除该错误。只要确保自己正确 unlink()(我使用 atexit.register(shm.unlink)),你应该会很好。 finally: _winapi.CloseHandle(h_map)
【解决方案4】:

如果您想使用写时复制功能并且您的数据是静态的(在子进程中未更改) - 您应该让 python 不要弄乱数据所在的内存块。您可以通过使用 C 或 C++ 结构(例如 stl)作为容器轻松地做到这一点,并提供您自己的 python 包装器,当创建 python 级对象时,将使用指向数据内存(或可能复制数据内存)的指针。 . 所有这一切都可以通过几乎 Python 的简单性和 cython 的语法轻松完成。

# 伪cython cdef 类 FooContainer: cdef 字符 * 数据 def __cinit__(self, char * foo_value): self.data = malloc(1024, sizeof(char)) memcpy(self.data, foo_value, min(1024, len(foo_value))) 定义获取(自我): 返回self.data # 蟒蛇部分 从 foo 导入 FooContainer f = FooContainer("你好世界") pid = fork() 如果不是 pid: f.get() # 这个调用将读取相同的内存页到哪里 # 父进程写入了 1024 个字符的 self.data # 并且 cython 会自动创建一个新的 python 字符串 # 从中获取对象并返回给调用者

上面的伪代码写得不好。不要使用它。在你的情况下,self.data 应该是 C 或 C++ 容器。

【讨论】:

    【解决方案5】:

    您可以使用 memcached 或 redis 并将每个设置为键值对 {'l1'...

    【讨论】:

    • 我认为redis正在阻塞。因此,如果需要多个读取器访问共享结构,那么 mp.Array/mp.Value 可能是更好的解决方案。这一切都取决于应用程序
    猜你喜欢
    • 1970-01-01
    • 2012-05-30
    • 2015-03-15
    • 2021-10-15
    • 2013-08-19
    • 2016-12-27
    • 2021-12-27
    • 2021-09-13
    相关资源
    最近更新 更多