【问题标题】:多处理:在进程之间共享一个大的只读对象?
【发布时间】:2010-10-14 04:15:19
【问题描述】:

通过multiprocessing 产生的子进程是否共享程序之前创建的对象?

我有以下设置:

do_some_processing(filename):
    for line in file(filename):
        if line.split(',')[0] in big_lookup_object:
            # something here

if __name__ == '__main__':
    big_lookup_object = marshal.load('file.bin')
    pool = Pool(processes=4)
    print pool.map(do_some_processing, glob.glob('*.data'))

我正在将一些大对象加载到内存中,然后创建一个需要使用该大对象的工作人员池。大对象是只读访问的,我不需要在进程之间传递它的修改。

我的问题是:大对象是否加载到共享内存中,就像我在 unix/c 中生成一个进程一样,还是每个进程都加载自己的大对象副本?

更新:进一步澄清 - big_lookup_object 是一个共享查找对象。我不需要将其拆分并单独处理。我需要保留一份。我需要拆分它的工作是读取许多其他大文件并根据查找对象查找这些大文件中的项目。

进一步更新:数据库是一个很好的解决方案,memcached 可能是一个更好的解决方案,磁盘上的文件(搁置或 dbm)可能会更好。在这个问题中,我对内存解决方案特别感兴趣。对于最终解决方案,我将使用 hadoop,但我想看看我是否也可以拥有本地内存版本。

【问题讨论】:

  • 您编写的代码将为父级和每个子级调用marshal.load(每个进程都导入模块)。
  • 你是对的,更正了。
  • 对于“本地内存”,如果您想避免复制以下内容可能有用docs.python.org/library/…
  • 分享号生成的进程(例如 fork 或 exec)是调用进程的完全副本...但在不同的内存中。要让一个进程与另一个进程通信,您需要 进程间通信 或 IPC 读取/写入某个 共享​​> 内存位置。
  • 你可以使用functools的部分函数,​​你必须在do_some_processing中设置big_lookup_object一个参数,当你想将一个lambda传递给Pool.map()或普通python@时它也很有用987654328@

标签: python multiprocessing


【解决方案1】:

子进程是否通过程序之前创建的多进程共享对象产生?

否(python 3.8 之前),和Yes in 3.8

进程有独立的内存空间。

解决方案 1

要充分利用拥有大量工人的大型结构,请执行此操作。

  1. 将每个worker写成一个“过滤器”——从stdin读取中间结果,工作,将中间结果写入stdout

  2. 将所有工作人员作为管道连接:

    process1 <source | process2 | process3 | ... | processn >result
    

每个进程读取、执行和写入。

这是非常有效的,因为所有进程都是同时运行的。写入和读取直接通过进程之间的共享缓冲区。


解决方案 2

在某些情况下,您有一个更复杂的结构 - 通常是 扇出 结构。在这种情况下,您的父母有多个孩子。

  1. 父级打开源数据。父母分叉了许多孩子。

  2. 父级读取源,将源的一部分分配给每个同时运行的子级。

  3. 当父母到达终点时,关闭管道。孩子获得文件结尾并正常完成。

儿童部分写起来很愉快,因为每个孩子都简单地阅读sys.stdin

父母在产生所有孩子和正确保留管道方面有一点花哨的步法,但还不错。

Fan-in 是相反的结构。许多独立运行的进程需要将它们的输入交错到一个公共进程中。收集器不那么容易编写,因为它必须从许多来源中读取。

通常使用select 模块从许多命名管道中读取数据,以查看哪些管道有待处理的输入。


解决方案 3

共享查找是数据库的定义。

解决方案 3A – 加载数据库。让工作人员处理数据库中的数据。

解决方案 3B – 使用 werkzeug(或类似的)创建一个非常简单的服务器,以提供响应 HTTP GET 的 WSGI 应用程序,以便工作人员可以查询服务器。


解决方案 4

共享文件系统对象。 Unix OS 提供共享内存对象。这些只是映射到内存的文件,以便交换 I/O 而不是更多的约定缓冲读取。

您可以通过多种方式从 Python 上下文中执行此操作

  1. 编写一个启动程序,(1) 将你原来的巨大对象分解成更小的对象,(2) 启动工作人员,每个工作人员都有一个更小的对象。较小的对象可以是腌制的 Python 对象,以节省一点点文件读取时间。

  2. 编写一个启动程序,该程序 (1) 读取您的原​​始巨大对象并使用 seek 操作写入一个页面结构的字节编码文件,以确保通过简单的查找可以轻松找到各个部分。这就是数据库引擎所做的——将数据分成页面,通过seek 轻松定位每个页面。

生成可以访问这个大型页面结构文件的工作人员。每个工人都可以找到相关的部分并在那里工作。

【讨论】:

  • 我的流程并不是真正的过滤器;它们都是一样的,只是处理不同的数据。
  • 它们通常可以构造为过滤器。他们读取他们的数据,完成他们的工作,然后写出他们的结果以供以后处理。
  • 我喜欢您的解决方案,但是阻塞 I/O 会发生什么?如果父母阻止读取/写入其中一个孩子怎么办? Select 确实会通知您可以写,但没有说明写多少。阅读也一样。
  • 这些是独立的过程——父母和孩子不会互相干扰。在管道一端生成的每个字节都可以在另一端立即使用——管道是一个共享缓冲区。不确定您的问题在这种情况下意味着什么。
  • 我可以验证 S.Lott 所说的话。我需要对单个文件执行相同的操作。因此,第一个工作人员在编号为 % 2 == 0 的每一行上运行其函数并将其保存到一个文件中,并将其他行发送到下一个管道进程(这是相同的脚本)。运行时间减少了一半。这有点 hacky,但开销比 multiprocessing 模块中的 map/poop 轻得多。
【解决方案2】:

通过多进程生成的子进程是否共享程序中较早创建的对象?

这取决于。对于全局只读变量,通常可以考虑(除了消耗的内存),否则不应该。

multiprocessing 的文档说:

Better to inherit than pickle/unpickle

在 Windows 上,许多类型来自 多处理需要是可腌制的 以便子进程可以使用它们。 但是,通常应该避免 将共享对象发送给其他人 使用管道或队列的进程。 相反,您应该安排程序 这样一个需要访问的进程 在别处创建的共享资源 可以从祖先那里继承 过程。

Explicitly pass resources to child processes

在 Unix 上,子进程可以使用 在一个创建的共享资源 使用全局的父进程 资源。然而,最好是 将对象作为参数传递给 子进程的构造函数。

除了制作代码 (可能)与 Windows 兼容 这也确保了只要 子进程还活着 对象不会被垃圾回收 在父进程中。这可能是 如果某些资源被释放,这很重要 当对象被垃圾回收时 在父进程中。

Global variables

请记住,如果代码在 子进程尝试访问全局 变量,然后是它看到的值(如果 any) 可能与值不同 在当时的父进程中 Process.start() 被调用。

示例

在 Windows 上(单 CPU):

#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool

x = 23000 # replace `23` due to small integers share representation
z = []    # integers are immutable, let's try mutable object

def printx(y):
    global x
    if y == 3:
       x = -x
    z.append(y)
    print os.getpid(), x, id(x), z, id(z) 
    print y
    if len(sys.argv) == 2 and sys.argv[1] == "sleep":
       time.sleep(.1) # should make more apparant the effect

if __name__ == '__main__':
    pool = Pool(processes=4)
    pool.map(printx, (1,2,3,4))

sleep:

$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4

没有sleep

$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4

【讨论】:

  • 嗯? z 如何在进程间共享??
  • @cbare:好问题! z 实际上不是共享的,正如 sleep 的输出所示。没有 sleep 的输出表明 single 进程处理(PID = 1148)所有工作;我们在最后一个例子中看到的是这个单一进程的 z 值。
  • 此答案显示z 未共享。因此,这回答了这个问题:“不,至少在 Windows 下,父变量不会在子级之间共享”。
  • @EOL:从技术上讲你是正确的,但实际上如果数据是只读的(与z 情况不同),它可以被认为是共享的。
  • 澄清一下,请记住,如果在子进程中运行的代码尝试访问全局变量... 在 2.7 文档中是指 Python 在窗户。
【解决方案3】:

S.Lott 是正确的。 Python 的多处理快捷方式有效地为您提供了一个单独的、重复的内存块。

在大多数 *nix 系统上,使用对os.fork() 的较低级别调用实际上会为您提供写时复制内存,这可能是您的想法。 AFAIK,理论上,在最简单的程序中,您可以读取该数据而无需复制。

然而,在 Python 解释器中事情并不那么简单。对象数据和元数据存储在同一个内存段中,因此即使对象永远不会更改,诸如该对象的引用计数器之类的递增操作也会导致内存写入,从而导致复制。几乎所有的 Python 程序所做的不仅仅是“print 'hello'”都会导致引用计数增加,因此您可能永远不会意识到写时复制的好处。

即使有人设法破解了 Python 中的共享内存解决方案,尝试跨进程协调垃圾收集也可能会非常痛苦。

【讨论】:

  • 这种情况下只会复制引用计数的内存区域,不一定是大的只读数据,不是吗?
【解决方案4】:

如果你在 Unix 下运行,它们可能共享同一个对象,因为 how fork works(即,子进程有单独的内存,但它是写时复制的,所以只要没有人修改它就可以共享它)。我尝试了以下方法:

import multiprocessing

x = 23

def printx(y):
    print x, id(x)
    print y

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(printx, (1,2,3,4))

得到以下输出:

$ ./mtest.py 23 22995656 1 23 22995656 2 23 22995656 3 23 22995656 4

当然,这并不能证明没有制作副本,但您应该能够通过查看ps 的输出来验证您的情况,看看有多少每个子进程正在使用的实际内存。

【讨论】:

  • 垃圾收集器呢?运行时会发生什么?内存布局不会变吗?
  • 这是一个有效的担忧。它是否会影响 Parand 将取决于他如何使用所有这些以及该代码必须有多可靠。如果它对他不起作用,我建议使用 mmap 模块进行更多控制(假设他想坚持这种基本方法)。
  • 我已经发布了对您示例的更新:stackoverflow.com/questions/659865/…
  • @JacobGabrielson:副本已制作完成。最初的问题是关于是否复制。
【解决方案5】:

不同的进程有不同的地址空间。就像运行解释器的不同实例一样。这就是 IPC(进程间通信)的用途。

您可以为此使用队列或管道。如果您想稍后通过网络分发进程,您也可以使用 rpc over tcp。

http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes

【讨论】:

  • 我认为 IPC 不适合这个;这是每个人都需要访问的只读数据。在进程之间传递它是没有意义的;在最坏的情况下,每个人都可以阅读自己的副本。我试图通过在每个进程中没有单独的副本来节省内存。
  • 您可以让一个主进程将数据片段委派给其他从属进程。从站可以请求数据,也可以推送数据。这样,并非每个进程都会拥有整个对象的副本。
  • @Vasil:如果每个进程都需要整个数据集,并且只是在其上运行不同的操作怎么办?
【解决方案6】:

与多处理本身没有直接关系,但从您的示例来看,您似乎可以只使用 shelve 模块或类似的东西。 “big_lookup_object”真的必须完全在内存中吗?

【讨论】:

  • 好点,我没有直接比较磁盘和内存的性能。我原以为会有很大的不同,但我还没有实际测试过。
【解决方案7】:

不可以,但您可以将数据作为子进程加载,并允许它与其他子进程共享数据。见下文。

import time
import multiprocessing

def load_data( queue_load, n_processes )

    ... load data here into some_variable

    """
    Store multiple copies of the data into
    the data queue. There needs to be enough
    copies available for each process to access. 
    """

    for i in range(n_processes):
        queue_load.put(some_variable)


def work_with_data( queue_data, queue_load ):

    # Wait for load_data() to complete
    while queue_load.empty():
        time.sleep(1)

    some_variable = queue_load.get()

    """
    ! Tuples can also be used here
    if you have multiple data files
    you wish to keep seperate.  
    a,b = queue_load.get()
    """

    ... do some stuff, resulting in new_data

    # store it in the queue
    queue_data.put(new_data)


def start_multiprocess():

    n_processes = 5

    processes = []
    stored_data = []

    # Create two Queues
    queue_load = multiprocessing.Queue()
    queue_data = multiprocessing.Queue()

    for i in range(n_processes):

        if i == 0:
            # Your big data file will be loaded here...
            p = multiprocessing.Process(target = load_data,
            args=(queue_load, n_processes))

            processes.append(p)
            p.start()   

        # ... and then it will be used here with each process
        p = multiprocessing.Process(target = work_with_data,
        args=(queue_data, queue_load))

        processes.append(p)
        p.start()

    for i in range(n_processes)
        new_data = queue_data.get()
        stored_data.append(new_data)    

    for p in processes:
        p.join()
    print(processes)    

【讨论】:

    【解决方案8】:

    对于 Linux/Unix/MacOS 平台,forkmap 是一种快速而简单的解决方案。

    【讨论】:

      猜你喜欢
      • 2013-07-21
      • 2012-07-22
      • 2014-04-24
      • 2011-10-13
      • 2017-07-27
      • 2018-06-25
      • 2011-04-09
      • 2014-08-27
      • 2018-01-01
      相关资源
      最近更新 更多