【问题标题】:Giving access to shared memory after child processes have already started在子进程已经启动后授予对共享内存的访问权限
【发布时间】:2011-11-17 03:55:19
【问题描述】:

如果数据仅在子进程生成后才可用(使用multiprocessing.Process),我如何让子进程访问共享内存中的数据?

我知道multiprocessing.sharedctypes.RawArray,但我不知道如何让我的子进程访问在进程已经启动后创建的RawArray

数据由父进程生成,数据量事先不知道。

如果不是GIL,我会改用线程,这将使这项任务更简单一些。使用非 CPython 实现不是一种选择。


查看muliprocessing.sharedctypes 的底层,看起来共享ctype 对象分配给using mmaped memory

所以这个问题真的可以归结为:如果在子进程生成后父进程调用mmap(),子进程能否访问匿名映射的内存?

这在一定程度上与this question 中的要求相似,只是在我的情况下,mmap() 的调用者是父进程而不是子进程。


(已解决)

我创建了自己的RawArray 版本,它在底层使用shm_open()。只要标识符 (tag) 匹配,生成的共享 ctypes 数组就可以与任何进程共享。

有关详细信息和示例,请参阅this answer

【问题讨论】:

  • 您不能以消息容器(序列、RawArray 等)作为参数启动您的进程吗?虽然最初是空的,但它将作为参考传递(此处不确定)并且进程应该能够读取(和写入)它......还是我弄错了?
  • 事先不知道元素的数量,所以我还不能创建容器(除非我创建一个非常巨大的容器来满足所有可能性)。
  • 如果你创建了[],你可以在以后随意调整它的大小,而它仍然是同一个对象......我错过了什么吗?
  • 这不会跨进程工作。
  • 为什么不从文件或管道/套接字对等基本 IPC 开始?

标签: python ipc multiprocessing shared-memory


【解决方案1】:

免责声明:我是问题的作者。

我最终使用posix_ipc 模块创建了我自己的RawArray 版本。我主要使用posix_ipc.SharedMemory,它在后台调用shm_open()

我的实现 (ShmemRawArray) 公开了与 RawArray 相同的功能,但需要两个附加参数 - 一个 tag 来唯一标识共享内存区域,以及一个 create 标志来确定我们是否应该创建一个新的共享内存段或附加到现有的。

如果有人感兴趣,这里是一份副本:https://gist.github.com/1222327

ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)

使用说明:

  • 前两个参数(typecode_or_typesize_or_initializer)应该与 RawArray 一样工作。
  • 只要tag 匹配,任何进程都可以访问共享数组。
  • 当原始对象(ShmemRawArray(..., create=True)返回)被删除时,共享内存段被取消链接
  • 使用当前存在的tag 创建共享数组将引发ExistentialError
  • 使用不存在(或已取消链接)的tag 访问共享数组也会引发ExistentialError

一个SSCCE(简短、独立、可编译的示例)展示了它的实际应用。

#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray

class Point(ctypes.Structure):
    _fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]

def worker(q):
    # get access to ctypes array shared by parent
    count, tag = q.get()
    shared_data = ShmemRawArray(Point, count, tag, False)

    proc_name = multiprocessing.current_process().name
    print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]

if __name__ == '__main__':
    procs = []
    np = multiprocessing.cpu_count()
    queue = multiprocessing.Queue()

    # spawn child processes
    for i in xrange(np):
        p = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(p)
        p.start()

    # create a unique tag for shmem segment
    tag = "stack-overflow-%d" % multiprocessing.current_process().pid

    # random number of points with random data
    count = randint(3,10) 
    combined_data = [Point(x=random(), y=random()) for i in xrange(count)]

    # create ctypes array in shared memory using ShmemRawArray
    # - we won't be able to use multiprocssing.sharectypes.RawArray here 
    #   because children already spawned
    shared_data = ShmemRawArray(Point, combined_data, tag)

    # give children info needed to access ctypes array
    for p in procs:
        queue.put((count, tag))

    print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
    for p in procs:
        p.join()

运行此程序会产生以下输出:

[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']

【讨论】:

  • 非常酷!与 RawArray 相比,这是否具有性能优势?
【解决方案2】:

您的问题听起来非常适合 posix_ipc or sysv_ipc modules,它公开了用于共享内存、信号量和消息队列的 POSIX 或 SysV API。那里的功能矩阵包括在他提供的模块中挑选的极好建议。

匿名mmap(2) 区域的问题在于,您无法轻松地与其他进程共享它们——如果它们是文件支持的,那会很容易,但如果您实际上不需要该文件来做其他事情,感觉很傻。您可以clone(2) 系统调用中使用CLONE_VM 标志,如果这是在C 中,但我不想尝试将它与可能对内存安全做出假设的语言解释器一起使用。 (即使在 C 语言中也会有点危险,因为五年后的维护程序员可能CLONE_VM 的行为感到震惊。)

但是 SysV 和更新的 POSIX 共享内存映射甚至允许不相关的进程通过标识符附加和分离共享内存,因此您需要做的就是与使用映射的进程共享创建映射的进程的标识符,然后当您在映射中操作数据时,它们可同时供所有进程使用,而无需任何额外的解析开销。 shm_open(3) 函数返回一个int,它在以后调用ftruncate(2)mmap(2) 时用作文件描述符,因此其他进程可以使用共享内存段而无需在文件系统中创建文件——并且即使使用它的所有进程都已退出,此内存仍将持续存在。 (对于 Unix 来说可能有点奇怪,但它很灵活。)

【讨论】:

  • 啊哈!这听起来可行。我会试一试,然后回复你。谢谢! (我一直在寻找一个暴露shm_open 和朋友的python 模块,我发现的只是shm 看起来有点陈旧)。
  • 你的帖子把我带到了a solution。非常感谢。
  • @sarnold - 而不是 clone() 我强烈希望将 minherit 用于映射区域
【解决方案3】:

我想你在找mmap module

关于数据的序列化this question当然回答如果你希望避免复制我没有解决方案

编辑

事实上,您可以使用 CPython 3.2 中的非 stdlib _mutliprocessing 模块来获取 mmap 对象的地址,并将其与 ctypes 对象的 from_address 一起使用 它实际上是什么 RawArray 实际上你不应该尝试调整 mmap 对象的大小,因为在这种情况下 mmap 的地址可能会改变

import mmap
import _multiprocessing
from ctypes import Structure,c_int

map = mmap.mmap(-1,4)
class A(Structure):
    _fields_ = [("x", c_int)]
x = _multiprocessing.address_of_buffer(map)
b=A.from_address(x[0])
b.x = 256

>>> map[0:4]
'\x00\x01\x00\x00'

要在创建子对象后公开内存,您必须将内存映射到正在调用的真实文件

map = mmap.mmap(open("hello.txt", "r+b").fileno(),4)

【讨论】:

  • 除非我弄错了,否则我需要序列化我的数据,然后将其加载到子进程中。这就是我要避免的。
  • 据我了解,您尝试避免广播消息并没有真正避免序列化的问题
  • 我担心性能和内存使用情况。必须在所有 proc 上反序列化相同的数据听起来并不吸引人,因此我对序列化的评论。
  • 感谢泽维尔的更新。恐怕我不明白实现我自己的 mmap 模块对我的情况有何帮助。
  • 重新更新。这几乎就是在multiprocessing.sharedctypes.RawArray 的幕后所做的事情,而且一切都像宣传的那样工作。我正在尝试将共享内存公开给子进程它们已经产生之后。
猜你喜欢
  • 1970-01-01
  • 2012-04-23
  • 1970-01-01
  • 1970-01-01
  • 2013-10-16
  • 1970-01-01
  • 1970-01-01
  • 2011-03-31
  • 2016-01-17
相关资源
最近更新 更多