【问题标题】:call multiprocessing in class method Python在类方法Python中调用多处理
【发布时间】:2023-03-17 18:24:01
【问题描述】:

最初,我有一个类来存储一些已处理的值并通过其他方法重用这些值。

问题是当我试图将类方法分成多个进程以加快速度时,python 产生了进程,但它似乎不起作用(正如我在任务管理器中看到的只有 1 个进程正在运行)并且结果从未交付.

我做了几次搜索,发现 pathos.multiprocessing 可以代替,但我想知道标准库是否可以解决这个问题?

from multiprocessing import Pool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return t

a = A(2)

a.run(list(range(10)))

【问题讨论】:

  • 使用if __name__ == '__main__' 守卫。
  • 这似乎根本不需要多处理。你在使用更大的数据还是什么?或不同的方法?产生一个进程来执行一个乘法的开销是不值得的。
  • 不久前joblib 提出了一个类似的问题;使用 numpy 会更快,因为要为新进程复制和上下文切换。 stackoverflow.com/questions/44084513/…

标签: python methods multiprocessing


【解决方案1】:

您的代码失败,因为它不能 pickle 实例方法 (self.cal),当您通过将多个进程映射到 multiprocessing.Pool 来生成多个进程时,Python 会尝试这样做(嗯,有一种方法可以这样做,但它太复杂了,而且无论如何都不是非常有用) - 因为没有共享内存访问,它必须“打包”数据并将其发送到生成的进程进行解包。如果您尝试腌制 a 实例,也会发生同样的情况。

multiprocessing 包中唯一可用的共享内存访问是鲜为人知的multiprocessing.pool.ThreadPool,所以如果你真的想这样做:

from multiprocessing.pool import ThreadPool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = ThreadPool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return rs

a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

但这不会为您提供并行化,因为它本质上映射到您可以访问共享内存的常规线程。您应该传递类/静态方法(如果您需要调用它们)以及您希望它们使用的数据(在您的情况下为self.vl)。如果您需要跨进程共享该数据,则必须使用一些共享内存抽象,例如multiprocessing.Value,当然还要应用互斥锁。

更新

我说你可以做到(有些模块或多或少正在这样做,例如检查pathos.multiprocessing)但我认为这不值得——当你到了必须这样做的地步时欺骗你的系统做你想做的事,很可能你要么使用了错误的系统,要么你应该重新考虑你的设计。但为了了解情况,这里有一种方法可以在多处理设置中执行您想要的操作:

import sys
from multiprocessing import Pool

def parallel_call(params):  # a helper for calling 'remote' instances
    cls = getattr(sys.modules[__name__], params[0])  # get our class type
    instance = cls.__new__(cls)  # create a new instance without invoking __init__
    instance.__dict__ = params[1]  # apply the passed state to the new instance
    method = getattr(instance, params[2])  # get the requested method
    args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
    return method(*args)  # expand arguments, call our method and return the result

class A(object):

    def __init__(self, vl):
        self.vl = vl

    def cal(self, nb):
        return nb * self.vl

    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(parallel_call, self.prepare_call("cal", dt))
        t.close()
        return rs

    def prepare_call(self, name, args):  # creates a 'remote call' package for each argument
        for arg in args:
            yield [self.__class__.__name__, self.__dict__, name, arg]

if __name__ == "__main__":  # important protection for cross-platform use
    a = A(2)
    print(a.run(list(range(10))))
    # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

我认为它的工作原理很容易解释,但简而言之,它将你的类的名称、它的当前状态(无信号,tho)、要调用的所需方法和调用它的参数传递给 @987654332 @ 为Pool 中的每个进程调用的函数。 Python 会自动腌制和取消腌制所有这些数据,因此 parallel_call 需要做的就是重建原始对象,在其中找到所需的方法并使用提供的参数调用它。

这样,我们只传递数据而不尝试传递活动对象,因此 Python 不会抱怨(好吧,在这种情况下,尝试将实例方法的引用添加到类参数中,看看会发生什么)和一切工作得很好。

如果您想更深入地使用“魔法”,您可以让它看起来与您的代码完全相同(创建您自己的 Pool 处理程序,从函数中获取名称并将名称发送到实际进程等)但是这应该为您的示例提供足够的功能。

但是,在您提高希望之前,请记住,这仅在共享“静态”实例(一旦您开始在多处理上下文中调用它就不会改变其初始状态的实例)时有效。如果A.cal 方法要更改vl 属性的内部状态 - 它只会影响它更改的实例(除非它在调用之间调用Pool 的主实例中发生更改)。如果你也想共享状态,你可以升级parallel_call调用后获取instance.__dict__并与方法调用结果一起返回,然后在调用端你必须更新本地@987654341 @用返回的数据改变原来的状态。这还不够——您实际上必须创建一个共享 dict 并处理所有 mutex 人员,以便所有进程同时访问它(您可以为此使用 multiprocessing.Manager)。

所以,正如我所说,麻烦多于其价值......

【讨论】:

  • 所以在我的情况下,没有解决方案可以使用类方法通过使用类中的共享内存向多个进程发送垃圾邮件,对吧?正确的方法是否只是将产卵方法带到课堂之外?
  • 有一个解决方案,我刚刚更新了我的答案。它很笨重,也不是那么好,但它可以工作......再说一次,我建议重新考虑你的设计,这样你就不必处理共享实例的状态。没有理由让自己变得更加困难......
  • 感谢您的帮助。我添加了您的代码并且它运行完美但是我认为我应该按照您的建议重新设计我的代码:)
  • @zwer 感谢无数次!这让我发疯了!在旁注中,我如何终止现在的僵尸进程?
【解决方案2】:

问题:好像没用(我在任务管理器中看到只有 1 个进程在运行) 结果永远不会交付。

只看到 1 个进程作为Pool,计算使用的进程数如下:
您给range(10) = 任务索引0..9,因此Pool 计算(10 / 4) * 4 = 8+1 = 9
启动first process 后,没有更多任务了。
使用range(32),你会看到4 process在运行。

您返回的是return t,而不是返回rs = pool.map(... 的结果。


例如,这将起作用

def cal(self, nb):
    import os
    print('pid:{} cal({})'.format(os.getpid(), nb))
    return nb * self.vl

def run(self,df):
    with mp.Pool(processes=4) as pool:
       return pool.map(self.cal, df)

if __name__ == '__main__':
    a = A(2)
    result = a.run(list(range(32)))
    print(result)

用 Python 测试:3.4.2

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-09-27
    • 2015-05-21
    • 2023-02-22
    • 2013-11-11
    • 2019-08-07
    • 1970-01-01
    • 2019-05-20
    • 2015-09-18
    相关资源
    最近更新 更多