【问题标题】:Creating objects in parallel with a multiprocessing pool?与多处理池并行创建对象?
【发布时间】:2013-04-02 09:09:00
【问题描述】:

Python 2.7.3

我有一个包含数千个数据文件的文件夹。每个数据文件都被提供给构造函数并进行大量处理。现在我正在遍历文件并按顺序处理它们:

class Foo:
    def __init__(self,file):
        self.bar = do_lots_of_stuff_with_numpy_and_scipy(file)

def do_lots_of_stuff_with_numpy_and_scipy(file):
    pass

def get_foos(dir):
    return [Foo(os.path.join(dir,file)) for file in os.listdir(dir)]

这很好用,但是太慢了。我想并行执行此操作。我试过了:

def parallel_get_foos(dir):
    p = Pool()
    foos = p.map(Foo, [os.path.join(dir,file) for file in os.listdir(dir)])
    p.close()
    p.join()
    return foos

if __name__ == "__main__":
    foos = parallel_get_foos(sys.argv[1])

但它只是有很多错误:

Process PoolWorker-7:
Traceback (most recent call last):
  File "/l/python2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/l/python2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/l/python2.7/lib/python2.7/multiprocessing/pool.py", line 99, in worker
    put((job, i, result))
  File "/l/python2.7/lib/python2.7/multiprocessing/queues.py", line 390, in put
    return send(obj)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我尝试过创建一个函数来返回对象,例如:

def get_foo(file):
    return Foo(file)

def parallel_get_foos(dir):
    ...
    foos = p.map(get_foo, [os.path.join(dir,file) for file in os.listdir(dir)])
    ...

但正如预期的那样,我得到了同样的错误。

我已经阅读了大量类似的主题,试图解决类似这样的问题,但没有一个解决方案对我有帮助。所以我很感激任何帮助!

编辑:

Bakuriu 正确地推测我在我的 do_lots_of_stuff 方法中定义了一个非顶级函数。具体来说,我是这样做的:

def fit_curve(data,degree):
    """Fits a least-square polynomial function to the given data."""
    sorted = data[data[:,0].argsort()].T
    coefficients = numpy.polyfit(sorted[0],sorted[1],degree)
    def eval(val,deg=degree):
        res = 0
        for coefficient in coefficients:
            res += coefficient*val**deg
            deg -= 1
        return res
    return eval

有没有办法让这个函数可以腌制?

【问题讨论】:

  • 您使用get_foo 函数的方法实际上效果很好。此外,做p.map(Foo, [ ... ]) 工作。你唯一不能做的就是腌制一个特定的方法(比如__init__)。
  • 不适合我 :( 顺便说一句,我使用的是 Python 2.7。
  • 您能否展示一下do_lots_of_stuff_with_numpy_and_scipy 的组织方式或多或少?尤其是在哪里以及如何使用 fit_curve 函数?您不必写下所有细节,但要明确与该功能使用相关的主要控制流程,以便我们了解哪种替代方案可能适合您的情况。作为一个疯狂的猜测,你可以简单地拥有一个像 def _eval(coefficients, val, degree): 这样实现 eval 函数的函数,让 fit_curve 返回 coefficients 并将函数调用替换为 _eval(coefs,val,degree)

标签: python parallel-processing multiprocessing pickle


【解决方案1】:

您正在做的事情(至少,您在示例中显示的内容)实际上工作正常:

$mkdir TestPool
$cd TestPool/
$for i in {1..100}
> do
>     touch "test$i"
> done
$ls
test1    test18  test27  test36  test45  test54  test63  test72  test81  test90
test10   test19  test28  test37  test46  test55  test64  test73  test82  test91
test100  test2   test29  test38  test47  test56  test65  test74  test83  test92
test11   test20  test3   test39  test48  test57  test66  test75  test84  test93
test12   test21  test30  test4   test49  test58  test67  test76  test85  test94
test13   test22  test31  test40  test5   test59  test68  test77  test86  test95
test14   test23  test32  test41  test50  test6   test69  test78  test87  test96
test15   test24  test33  test42  test51  test60  test7   test79  test88  test97
test16   test25  test34  test43  test52  test61  test70  test8   test89  test98
test17   test26  test35  test44  test53  test62  test71  test80  test9   test99
$vi test_pool_dir.py
$cat test_pool_dir.py 
import os
import multiprocessing

class Foo(object):
    def __init__(self, fname):
        self.fname = fname   #or your calculations


def parallel_get_foos(directory):
    p = multiprocessing.Pool()
    foos = p.map(Foo, [os.path.join(directory, fname) for fname in os.listdir(directory)])
    p.close()
    p.join()
    return foos

if __name__ == '__main__':
    foos = parallel_get_foos('.')
    print len(foos)   #expected 101: 100 files plus this script

$python test_pool_dir.py 
101

版本信息:

$python --version
Python 2.7.3
$uname -a
Linux giacomo-Acer 3.2.0-39-generic #62-Ubuntu SMP Thu Feb 28 00:28:53 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

我的猜测是你没有完全按照你在你展示的代码示例中展示的那样做。例如,我在执行此操作时收到与您类似的错误:

>>> import pickle
>>> def test():
...     def test2(): pass
...     return test2
... 
>>> import multiprocessing
>>> p = multiprocessing.Pool()
>>> p.map(test(), [1,2,3])
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

这很明显,因为:

>>> pickle.dumps(test())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 748, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <function test2 at 0x7fad15fc2938>: it's not found as __main__.test2

pickle 的文档指出:

可以腌制以下几种:

  • NoneTrueFalse
  • 整数、长整数、浮点数、复数
  • 普通和 Unicode 字符串
  • tuples、lists、sets 以及仅包含可腌制对象的字典
  • 函数在模块的顶层定义
  • 内置函数在模块的顶层定义
  • 在模块顶层定义的类
  • 此类的实例,其__dict__ 或调用__getstate__() 的结果是可腌制的(请参阅腌制协议部分 了解详情)。

然后继续:

请注意,函数(内置的和用户定义的)是由“完全 限定”名称引用,而不是按值。这意味着只有 函数名称与模块名称一起被腌制 函数的定义。既不是函数的代码,也不是它的任何代码 函数属性被腌制。因此定义模块必须是 可在 unpickling 环境中导入,并且模块必须包含 命名对象,否则将引发异常。

【讨论】:

  • 是的,你完全正确。在我的类中,我根据传入的文件的内容定义了一个函数,并返回该函数。我没有意识到这个功能会成为我问题的根源。那么我就不可能完成我想做的事吗?
猜你喜欢
  • 1970-01-01
  • 2012-01-23
  • 1970-01-01
  • 2017-11-17
  • 2022-11-30
  • 2013-10-26
  • 1970-01-01
  • 1970-01-01
  • 2012-05-05
相关资源
最近更新 更多