【问题标题】:Safely write to file in parallel with pathos.multiprocessing与 pathos.multiprocessing 并行安全地写入文件
【发布时间】:2015-12-10 16:10:29
【问题描述】:

众所周知,pathos.multiprocessing 优于 Python 中的 multiprocessing 库,因为前者使用 dill 而不是 pickle,并且可以序列化更广泛的函数和其他内容。

但是在使用pathospool.map() 结果逐行写入文件时,会遇到一些麻烦。如果ProcessPool 中的所有进程将结果逐行写入单个文件,它们会相互干扰同时写入一些行并破坏工作。在使用普通的multiprocessing 包时,我能够让进程写入自己的单独文件,以当前进程ID命名,如下所示:

example_data = range(100)
def process_point(point):
    output = "output-%d.gz" % mpp.current_process().pid
    with gzip.open(output, "a+") as fout:
        fout.write('%d\n' % point**2)

那么,这段代码运行良好:

import multiprocessing as mpp
pool = mpp.Pool(8)
pool.map(process_point, example_data)

但是这段代码没有:

from pathos import multiprocessing as mpp
pool = mpp.Pool(8)
pool.map(process_point, example_data)

并抛出AttributeError:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-10-a6fb174ec9a5> in <module>()
----> 1 pool.map(process_point, example_data)

/usr/local/lib/python2.7/dist-packages/processing-0.52_pathos-py2.7-linux-x86_64.egg/processing/pool.pyc in map(self, func, iterable, chunksize)
    128         '''
    129         assert self._state == RUN
--> 130         return self.mapAsync(func, iterable, chunksize).get()
    131
    132     def imap(self, func, iterable, chunksize=1):

/usr/local/lib/python2.7/dist-packages/processing-0.52_pathos-py2.7-linux-x86_64.egg/processing/pool.pyc in get(self, timeout)
    371             return self._value
    372         else:
--> 373             raise self._value
    374
    375     def _set(self, i, obj):

AttributeError: 'module' object has no attribute 'current_process'

pathos 中没有current_process(),我找不到类似的东西。有什么想法吗?

【问题讨论】:

    标签: python python-multiprocessing pathos


    【解决方案1】:

    我是pathos 作者。虽然您的答案适用于这种情况,但最好在 pathos 中使用 multiprocessing 的分支,该分支位于相当迟钝的位置:pathos.helpers.mp

    这为您提供了与multiprocessing 的一对一映射,但具有更好的序列化。因此,您将使用pathos.helpers.mp.current_process

    抱歉,它既没有记录也不明显……我应该至少改进这两个问题中的一个。

    【讨论】:

    • 谢谢!这似乎最终起作用:pathos.helpers.mp.currentProcess().getPid()
    【解决方案2】:

    这个简单的技巧似乎可以完成这项工作:

    import multiprocessing as mp
    from pathos import multiprocessing as pathos_mp
    import gzip
    
    example_data = range(100)
    def process_point(point):
        output = "output-%d.gz" % mp.current_process().pid
        with gzip.open(output, "a+") as fout:
            fout.write('%d\n' % point**2)
    
    pool = pathos_mp.Pool(8)
    pool.map(process_point, example_data)
    

    换句话说,可以使用pathos进行并行计算,使用普通的multiprocessing包来获取当前进程的id,这样就可以正常工作了!

    【讨论】:

      猜你喜欢
      • 2010-12-21
      • 1970-01-01
      • 2012-11-06
      • 2020-02-18
      • 2019-07-02
      • 2013-09-26
      • 2015-06-26
      • 2012-09-23
      • 1970-01-01
      相关资源
      最近更新 更多