【问题标题】:Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map()使用多处理 Pool.map() 时无法腌制 <type 'instancemethod'>
【发布时间】:2010-12-21 11:28:14
【问题描述】:

我正在尝试使用multiprocessingPool.map() 函数同时分配工作。当我使用以下代码时,它工作正常:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

但是,当我在更面向对象的方法中使用它时,它就不起作用了。它给出的错误信息是:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

当以下是我的主程序时会发生这种情况:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

以下是我的someClass类:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

任何人都知道问题可能是什么,或者解决它的简单方法?

【问题讨论】:

  • 如果 f 是嵌套函数,则会出现类似的错误PicklingError: Can't pickle &lt;class 'function'&gt;: attribute lookup builtins.function failed

标签: python multithreading multiprocessing pickle pool


【解决方案1】:

pathos.multiprocessing 为我工作。

它有一个pool 方法并序列化所有与multiprocessing 不同的东西

import pathos.multiprocessing as mp
pool = mp.Pool(processes=2) 

【讨论】:

    【解决方案2】:

    我遇到了同样的问题,但发现有一个 JSON 编码器可用于在进程之间移动这些对象。

    from pyVmomi.VmomiSupport import VmomiJSONEncoder
    

    使用它来创建您的列表:

    jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)
    

    然后在映射函数中,使用this来恢复对象:

    pfVmomiObj = json.loads(jsonSerialized)
    

    【讨论】:

      【解决方案3】:

      上面parisjohn 的解决方案对我很有效。此外,代码看起来干净且易于理解。在我的例子中,有几个函数可以使用 Pool 调用,所以我在下面修改了 parisjohn 的代码。我做了 call 以便能够调用多个函数,并且函数名在来自go() 的参数dict 中传递:

      from multiprocessing import Pool
      class someClass(object):
          def __init__(self):
              pass
      
          def f(self, x):
              return x*x
      
          def g(self, x):
              return x*x+1    
      
          def go(self):
              p = Pool(4)
              sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
              print sc
      
          def __call__(self, x):
              if x["func"]=="f":
                  return self.f(x["v"])
              if x["func"]=="g":
                  return self.g(x["v"])        
      
      sc = someClass()
      sc.go()
      

      【讨论】:

        【解决方案4】:

        更新:在撰写本文时,namedTuples 是可选的(从 python 2.7 开始)

        这里的问题是子进程无法导入对象的类 - 在这种情况下是类 P-,在多模型项目的情况下,类 P 应该可以在子进程的任何地方导入习惯了

        一种快速的解决方法是通过将其影响到 globals() 来使其可导入

        globals()["P"] = P
        

        【讨论】:

          【解决方案5】:

          为什么不使用单独的函数?

          def func(*args, **kwargs):
              return inst.method(args, kwargs)
          
          print pool.map(func, arr)
          

          【讨论】:

            【解决方案6】:

            在这个简单的情况下,someClass.f 没有从类继承任何数据,也没有将任何东西附加到该类,一个可能的解决方案是分离出f,因此它可以被腌制:

            import multiprocessing
            
            
            def f(x):
                return x*x
            
            
            class someClass(object):
                def __init__(self):
                    pass
            
                def go(self):
                    pool = multiprocessing.Pool(processes=4)       
                    print pool.map(f, range(10))
            

            【讨论】:

              【解决方案7】:

              所有这些解决方案都很丑陋,因为除非您跳出标准库,否则多处理和酸洗会受到破坏和限制。

              如果使用multiprocessing 的fork 称为pathos.multiprocesssing,则可以直接在多处理的map 函数中使用类和类方法。这是因为dill 被用来代替picklecPickle,而dill 在python 中几乎可以序列化任何东西。

              pathos.multiprocessing 还提供了一个异步映射函数……它可以map 具有多个参数的函数(例如map(math.pow, [1,2,3], [4,5,6])

              见: What can multiprocessing and dill do together?

              和: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

              >>> import pathos.pools as pp
              >>> p = pp.ProcessPool(4)
              >>> 
              >>> def add(x,y):
              ...   return x+y
              ... 
              >>> x = [0,1,2,3]
              >>> y = [4,5,6,7]
              >>> 
              >>> p.map(add, x, y)
              [4, 6, 8, 10]
              >>> 
              >>> class Test(object):
              ...   def plus(self, x, y): 
              ...     return x+y
              ... 
              >>> t = Test()
              >>> 
              >>> p.map(Test.plus, [t]*4, x, y)
              [4, 6, 8, 10]
              >>> 
              >>> p.map(t.plus, x, y)
              [4, 6, 8, 10]
              

              明确地说,您可以一开始就完全按照自己的意愿去做,如果您愿意,也可以通过解释器来做。

              >>> import pathos.pools as pp
              >>> class someClass(object):
              ...   def __init__(self):
              ...     pass
              ...   def f(self, x):
              ...     return x*x
              ...   def go(self):
              ...     pool = pp.ProcessPool(4)
              ...     print pool.map(self.f, range(10))
              ... 
              >>> sc = someClass()
              >>> sc.go()
              [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
              >>> 
              

              在此处获取代码: https://github.com/uqfoundation/pathos

              【讨论】:

              • 您能否根据 pathos.pp 更新此答案,因为 pathos.multiprocessing 不再存在?
              • 我是pathos 作者。您所指的版本是几年前的。试试github上的版本,可以用pathos.pp或者github.com/uqfoundation/ppft
              • github.com/uqfoundation/pathos。 @SaheelGodhane:一个新版本早就应该发布了,但应该很快就会发布。
              • 首先是pip install setuptools,然后是pip install git+https://github.com/uqfoundation/pathos.git@master。这将获得适当的依赖关系。新版本即将发布……现在pathos 中的几乎所有内容也可以在 Windows 上运行,并且与3.x 兼容。
              • @Rika:是的。可以使用阻塞、迭代和异步映射。
              【解决方案8】:

              问题是多处理必须腌制事物以将它们吊在进程之间,并且绑定的方法是不可腌制的。解决方法(无论您是否认为它“简单”;-)是将基础架构添加到您的程序中以允许对此类方法进行腌制,并使用 copy_reg 标准库方法注册它。

              例如,Steven Bethard 对this thread 的贡献(接近线程的末尾)展示了一种完全可行的方法来允许通过copy_reg 进行方法酸洗/取消酸洗。

              【讨论】:

              • 太好了 - 谢谢。无论如何,似乎已经取得了一些进展:使用pastebin.ca/1693348 的代码我现在得到一个 RuntimeError: maximum recursion depth exceeded。我环顾四周,一篇论坛帖子建议将最大深度增加到 1500(默认为 1000),但我对此并不满意。老实说,我看不到(至少我的代码的)哪一部分可能会递归失控,除非由于某种原因代码在循环中酸洗和解酸,因为我为了做一些细微的改变Steven 的代码 OO'd?
              • 你的_pickle_method返回self._unpickle_method,绑定方法;所以当然 pickle 现在会尝试腌制 - 它会按照您的指示进行:通过递归调用 _pickle_method。 IE。以这种方式OOing 代码,您不可避免地引入了无限递归。我建议回到 Steven 的代码(在不合适的时候不要崇拜 OO:Python 中的许多事情最好以更实用的方式完成,这就是其中之一)。
              • For the super super lazy,请参阅唯一不麻烦发布实际未损坏代码的答案...
              • 另一种修复/规避酸洗问题的方法是使用莳萝,见我的回答stackoverflow.com/questions/8804830/…
              【解决方案9】:

              您还可以在someClass() 中定义一个__call__() 方法,该方法调用someClass.go(),然后将someClass() 的实例传递给池。这个对象是可腌制的,它工作正常(对我来说)......

              class someClass(object):
                 def __init__(self):
                     pass
                 def f(self, x):
                     return x*x
              
                 def go(self):
                    p = Pool(4)
                    sc = p.map(self, range(4))
                    print sc
              
                 def __call__(self, x):   
                   return self.f(x)
              
              sc = someClass()
              sc.go()
              

              【讨论】:

                【解决方案10】:

                一个潜在的简单解决方案是改用multiprocessing.dummy。这是多处理接口的基于线程的实现,在 Python 2.7 中似乎没有这个问题。我在这里没有太多经验,但是这个快速的导入更改允许我在类方法上调用 apply_async。

                multiprocessing.dummy上的一些好资源:

                https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

                http://chriskiehl.com/article/parallelism-in-one-line/

                【讨论】:

                  【解决方案11】:

                  您可以使用另一种快捷方式,尽管它可能效率低下,具体取决于您的类实例中的内容。

                  正如大家所说的那样,问题在于multiprocessing 代码必须腌制它发送到它已启动的子进程的东西,而腌制器不执行实例方法。

                  但是,您可以不发送实例方法,而是将实际的类实例以及要调用的函数的名称发送到普通函数,然后使用getattr 调用实例方法,从而创建Pool 子进程中的绑定方法。这类似于定义__call__ 方法,只是可以调用多个成员函数。

                  从他的答案中窃取@EricH.的代码并对其进行一些注释(我重新输入了它,因此所有名称都发生了变化等等,出于某种原因,这似乎比剪切和粘贴更容易:-))以说明所有魔法:

                  import multiprocessing
                  import os
                  
                  def call_it(instance, name, args=(), kwargs=None):
                      "indirect caller for instance methods and multiprocessing"
                      if kwargs is None:
                          kwargs = {}
                      return getattr(instance, name)(*args, **kwargs)
                  
                  class Klass(object):
                      def __init__(self, nobj, workers=multiprocessing.cpu_count()):
                          print "Constructor (in pid=%d)..." % os.getpid()
                          self.count = 1
                          pool = multiprocessing.Pool(processes = workers)
                          async_results = [pool.apply_async(call_it,
                              args = (self, 'process_obj', (i,))) for i in range(nobj)]
                          pool.close()
                          map(multiprocessing.pool.ApplyResult.wait, async_results)
                          lst_results = [r.get() for r in async_results]
                          print lst_results
                  
                      def __del__(self):
                          self.count -= 1
                          print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
                  
                      def process_obj(self, index):
                          print "object %d" % index
                          return "results"
                  
                  Klass(nobj=8, workers=3)
                  

                  输出显示,实际上,构造函数被调用了一次(在原始 pid 中),而析构函数被调用了 9 次(每个副本制作一次 = 每个 pool-worker-process 需要 2 或 3 次,再加上一次在原始过程中)。这通常没问题,因为在这种情况下,默认pickler 会复制整个实例并(半)秘密地重新填充它——在这种情况下,这样做:

                  obj = object.__new__(Klass)
                  obj.__dict__.update({'count':1})
                  

                  ——这就是为什么即使在三个工作进程中调用了八次析构函数,它每次都从 1 倒数到 0 ——当然,这样你仍然会遇到麻烦。如有需要,您可以提供自己的__setstate__

                      def __setstate__(self, adict):
                          self.count = adict['count']
                  

                  以这种情况为例。

                  【讨论】:

                  • 这是迄今为止解决这个问题的最佳答案,因为它最容易应用于不可腌制的默认行为
                  【解决方案12】:

                  尽管 Steven Bethard 的解决方案存在一些限制:

                  当你将你的类方法注册为一个函数时,你的类的析构函数会在每次你的方法处理完成时被调用。因此,如果您有 1 个类的实例调用其方法的 n 次,则成员可能会在 2 次运行之间消失,并且您可能会收到一条消息 malloc: *** error for object 0x...: pointer being freed was not allocated(例如打开成员文件)或 pure virtual method called, terminate called without an active exception(这意味着比成员的生命周期我使用的对象比我想象的要短)。我在处理大于池大小的 n 时得到了这个。这是一个简短的例子:

                  from multiprocessing import Pool, cpu_count
                  from multiprocessing.pool import ApplyResult
                  
                  # --------- see Stenven's solution above -------------
                  from copy_reg import pickle
                  from types import MethodType
                  
                  def _pickle_method(method):
                      func_name = method.im_func.__name__
                      obj = method.im_self
                      cls = method.im_class
                      return _unpickle_method, (func_name, obj, cls)
                  
                  def _unpickle_method(func_name, obj, cls):
                      for cls in cls.mro():
                          try:
                              func = cls.__dict__[func_name]
                          except KeyError:
                              pass
                          else:
                              break
                      return func.__get__(obj, cls)
                  
                  
                  class Myclass(object):
                  
                      def __init__(self, nobj, workers=cpu_count()):
                  
                          print "Constructor ..."
                          # multi-processing
                          pool = Pool(processes=workers)
                          async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
                          pool.close()
                          # waiting for all results
                          map(ApplyResult.wait, async_results)
                          lst_results=[r.get() for r in async_results]
                          print lst_results
                  
                      def __del__(self):
                          print "... Destructor"
                  
                      def process_obj(self, index):
                          print "object %d" % index
                          return "results"
                  
                  pickle(MethodType, _pickle_method, _unpickle_method)
                  Myclass(nobj=8, workers=3)
                  # problem !!! the destructor is called nobj times (instead of once)
                  

                  输出:

                  Constructor ...
                  object 0
                  object 1
                  object 2
                  ... Destructor
                  object 3
                  ... Destructor
                  object 4
                  ... Destructor
                  object 5
                  ... Destructor
                  object 6
                  ... Destructor
                  object 7
                  ... Destructor
                  ... Destructor
                  ... Destructor
                  ['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
                  ... Destructor
                  

                  __call__ 方法不是那么等效,因为 [None,...] 是从结果中读取的:

                  from multiprocessing import Pool, cpu_count
                  from multiprocessing.pool import ApplyResult
                  
                  class Myclass(object):
                  
                      def __init__(self, nobj, workers=cpu_count()):
                  
                          print "Constructor ..."
                          # multiprocessing
                          pool = Pool(processes=workers)
                          async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
                          pool.close()
                          # waiting for all results
                          map(ApplyResult.wait, async_results)
                          lst_results=[r.get() for r in async_results]
                          print lst_results
                  
                      def __call__(self, i):
                          self.process_obj(i)
                  
                      def __del__(self):
                          print "... Destructor"
                  
                      def process_obj(self, i):
                          print "obj %d" % i
                          return "result"
                  
                  Myclass(nobj=8, workers=3)
                  # problem !!! the destructor is called nobj times (instead of once), 
                  # **and** results are empty !
                  

                  所以这两种方法都不令人满意...

                  【讨论】:

                  • 你得到了None,因为你对__call__的定义缺少return:它应该是return self.process_obj(i)
                  • @Eric 我遇到了同样的错误,我尝试了这个解决方案,但是我开始收到新错误“cPickle.PicklingError: Can't pickle : attribute lookup 内置.function失败”。你知道这背后的可能原因是什么吗?
                  【解决方案13】:

                  您还可以在someClass() 中定义__call__() 方法,该方法调用someClass.go(),然后将someClass() 的实例传递给池。这个对象是可腌制的,它工作正常(对我来说)......

                  【讨论】:

                  • 这比 Alex Martelli 提出的技术要容易得多,但您只能将每个类发送一个方法到您的多处理池。
                  • 另一个需要牢记的细节是只有对象(类实例)会被腌制,而不是类本身。因此,如果您更改了任何类属性的默认值,这些更改将不会传播到不同的进程。解决方法是确保您的函数需要的所有内容都存储为实例属性。
                  • @dorvak 你能用__call__() 举一个简单的例子吗?我认为您的答案可能是更清晰的答案-我正在努力理解这个错误,而且我第一次来看电话。顺便说一句,这个答案也有助于澄清多处理的作用:[stackoverflow.com/a/20789937/305883]
                  • 你能举个例子吗?
                  • 张贴了一个new answer(目前在这个下面),其中包含示例代码。
                  猜你喜欢
                  • 2014-09-29
                  • 1970-01-01
                  • 2012-02-06
                  • 2014-04-27
                  • 2016-05-23
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  相关资源
                  最近更新 更多