【问题标题】:Can't pickle static method - Multiprocessing - Python不能腌制静态方法 - 多处理 - Python
【发布时间】:2014-01-14 10:25:06
【问题描述】:

我正在对我使用类的代码应用一些并行化。我知道如果没有与 Python 提供的任何其他方法不同,就不可能选择一个类方法。我找到了解决方案here。在我的代码中,我必须使用类进行并行化的部分。在这里,我发布了一个非常简单的代码,仅代表我的结构(相同,但我删除了方法内容,这是很多数学演算,对我得到的输出来说微不足道)。问题是因为我可以腌制一种方法(shepard_interpolation),但使用另一种方法(calculate_orientation_uncertainty)我得到了腌制错误。我不知道为什么会这样,或者为什么会部分起作用。

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    if func_name.startswith('__') and not func_name.endswith('__'): #deal with mangled names
        cls_name = cls.__name__.lstrip('_')
        func_name = '_' + cls_name + func_name
    print cls
    return _unpickle_method, (func_name, obj, cls)


def _unpickle_method(func_name, obj, cls):
    for cls in cls.__mro__:
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

class ImageData(object):

    def __init__(self, width=60, height=60):
        self.width = width
        self.height = height
        self.data = []
        for i in range(width):
            self.data.append([0] * height)

    def shepard_interpolation(self, seeds=20):
        print "ImD - Sucess"       

import copy_reg
import types
from itertools import product
from multiprocessing import Pool

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class VariabilityOfGradients(object):
    def __init__(self):
        pass

    @staticmethod
    def aux():
        return "VoG - Sucess" 

    @staticmethod
    def calculate_orientation_uncertainty():
        results = []
        pool = Pool()
        for x, y in product(range(1, 5), range(1, 5)):
            result = pool.apply_async(VariabilityOfGradients.aux) 
        results.append(result.get())
        pool.close()
        pool.join()        


if __name__ == '__main__':  
    results = []
    pool = Pool()
    for _ in range(3):
        result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
        results.append(result.get())
    pool.close()
    pool.join()

    VariabilityOfGradients.calculate_orientation_uncertainty()   

运行时,出现“PicklingError: Can't pickle : attribute lookup builtin.function failed”。这与here 几乎相同。我看到的唯一区别是我的方法是静态的。

编辑:

我注意到,在我的 calculate_orientation_uncertainty 中,当我将函数调用为 result = pool.apply_async(VariabilityOfGradients.aux()) 时,即使用括号(在文档示例中我从未见过这个),它似乎有效。但是,当我尝试获取结果时,我收到“TypeError: 'int' object is not callable”...

任何帮助将不胜感激。提前谢谢你。

【问题讨论】:

  • 如果这不是您的代码,人们很可能会要求您发布您遇到问题的确切代码。只是一个通知。另外,您使用的是哪个 Python 版本,pickle 在版本中的行为有点不同,而且我对一个类对象进行酸洗没有问题,只要它不包含诸如套接字或活动线程之类的实例.通常你将一个线程类存储在两个类对象中,一个可能继承另一个类,或者只是在线程类中启动一个类,如果有意义的话,只需线程化“内部”类。
  • 这是我的代码。我只是删除了方法中的内容(对于问题来说无关紧要,很多数学演算是不必要的)。
  • 错误信息是什么?
  • @pceccon:关于您的编辑:如果您使用pool.apply_async(VariabilityOfGradients.aux()),那么您首先调用VariabilityOfGradients.aux(),然后将其返回值发送到pool.apply_async。这不是您想要的,因为池工作人员没有同时调用 VariabilityOfGradients.aux
  • 关于原问题:isinstance(func, types.MethodType)Falsefunc是一个静态方法,所以copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)不影响静态方法。简单的解决方法是在全局模块级别使您的静态方法成为普通函数。

标签: python class multiprocessing pickle pool


【解决方案1】:

您可以在模块级别定义一个普通函数一个静态方法。这保留了静态方法的调用语法、自省和可继承特性,同时避免了酸洗问题:

def aux():
    return "VoG - Sucess" 

class VariabilityOfGradients(object):
    aux = staticmethod(aux)

例如,

import copy_reg
import types
from itertools import product
import multiprocessing as mp

def _pickle_method(method):
    """
    Author: Steven Bethard (author of argparse)
    http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    """
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)


def _unpickle_method(func_name, obj, cls):
    """
    Author: Steven Bethard
    http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    """
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class ImageData(object):

    def __init__(self, width=60, height=60):
        self.width = width
        self.height = height
        self.data = []
        for i in range(width):
            self.data.append([0] * height)

    def shepard_interpolation(self, seeds=20):
        print "ImD - Success"       

def aux():
    return "VoG - Sucess" 

class VariabilityOfGradients(object):
    aux = staticmethod(aux)

    @staticmethod
    def calculate_orientation_uncertainty():
        pool = mp.Pool()
        results = []
        for x, y in product(range(1, 5), range(1, 5)):
            # result = pool.apply_async(aux) # this works too
            result = pool.apply_async(VariabilityOfGradients.aux, callback=results.append)
        pool.close()
        pool.join()
        print(results)


if __name__ == '__main__':  
    results = []
    pool = mp.Pool()
    for _ in range(3):
        result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
        results.append(result.get())
    pool.close()
    pool.join()

    VariabilityOfGradients.calculate_orientation_uncertainty()   

产量

ImD - Success
ImD - Success
ImD - Success
['VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess']

顺便说一句,result.get() 会阻塞调用过程,直到pool.apply_async(例如ImageData.shepard_interpolation)调用的函数完成。所以

for _ in range(3):
    result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
    results.append(result.get())

实际上是按顺序调用ImageData.shepard_interpolation,违背了池的目的。

你可以使用

for _ in range(3):
    pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()],
                     callback=results.append)

回调函数(例如results.append)在函数完成时在调用进程的线程中被调用。它被发送一个参数——函数的返回值。因此,没有什么能阻止三个pool.apply_async 调用的快速进行,并且三个调用ImageData.shepard_interpolation 所做的工作将同时执行。

或者,在这里使用pool.map 可能更简单。

results = pool.map(ImageData.shepard_interpolation, [ImageData()]*3)

【讨论】:

  • 我知道这一点,但我只是想知道是否有一种方法可以在 Python 中做我想做的事情,而忽略这种方法。但好像我做不到。卢比。
  • 嗨,@unutbu。我能再问你一件事吗?它有效,但是,如您所见,在此示例中,我迭代了 25 次。在我的问题中,我有两个 200 x 200 的 for,我的 aux 函数有很多数学处理。正在发生的事情是,调用这个并行化比根本不使用多处理花费更多的时间。你知道为什么吗?我在这个例子中应用了正确的概念吗?谢谢。
  • 你能发布有问题的代码吗?可运行的代码会有很大帮助。
【解决方案2】:

如果你使用一个名为pathos.multiprocesssingmultiprocessing 的fork,你可以直接在multiprocessing 的map 函数中使用类和类方法。这是因为dill 被用来代替picklecPickle,而dill 在python 中几乎可以序列化任何东西。

pathos.multiprocessing 还提供了一个异步映射函数……它可以map 具有多个参数的函数(例如map(math.pow, [1,2,3], [4,5,6])

见: What can multiprocessing and dill do together?

和: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

在此处获取代码: https://github.com/uqfoundation/pathos

pathos 也有一个异步映射 (amap) 以及 imap

【讨论】:

    【解决方案3】:

    我不确定这是否是您正在寻找的,但我的使用方式略有不同。我想使用在多个线程上运行的同一类中的类中的方法。

    我是这样实现的:

    from multiprocessing import Pool
    
    class Product(object):
    
            def __init__(self):
                    self.logger = "test"
    
            def f(self, x):
                    print(self.logger)
                    return x*x
    
            def multi(self):
                    p = Pool(5)
                    print(p.starmap(Product.f, [(Product(), 1), (Product(), 2), (Product(), 3)]))
    
    
    if __name__ == '__main__':
            obj = Product()
            obj.multi()
    

    【讨论】:

      猜你喜欢
      • 2019-08-09
      • 1970-01-01
      • 2018-04-22
      • 1970-01-01
      • 2015-11-07
      • 2016-01-05
      • 2010-12-27
      • 1970-01-01
      相关资源
      最近更新 更多