【问题标题】:mapping a list of tuples to the multiprocessing pool object in python将元组列表映射到python中的多处理池对象
【发布时间】:2018-06-14 18:33:51
【问题描述】:

我在使用以下格式的代码时遇到问题,并假设错误与我尝试访问每个元组中元素的方式有关。

from numberer import numberify
from sys import argv
infile=argv[1]
from multiprocessing import Pool
pool=Pool(15)
import os

def chunker(fob):
    chunkbegin=0
    filesize=os.stat(fob.name).st_size
    while chunkbegin < filesize:
        chunkend=chunkbegin+100000
        fob.seek(chunkend)
        fob.readline()
        chunkend=fob.tell()
        yield (chunkbegin,chunkend)
        chunkbegin=chunkend

def run(tup, fob):
    fob.seek(tup[0])
    length=int(tup[1])-int(tup[0])
    lines=fob.readlines(length)
    for line in lines:
        print(line)

fob=open(infile)
chunks=[x for x in chunker(fob)]
pool.map(run, (chunks, fob))

确切的错误是:

Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'run' on <module '__main__' from 'pretonumber.py'>

1) 所以当 map 函数将元组映射到函数时;我假设这些元素应该像普通元组一样被调用? IE只有一个索引?

2) 我传递给函数 run: 的元素块是一个元组列表: chunks=[(0,100000),(100000,200000)....] 由生成器分块器创建。

谢谢。

【问题讨论】:

    标签: python dictionary multiprocessing tuples pool


    【解决方案1】:

    map 方法接受一个可迭代的参数。可迭代的每个元素都传递给run 的一个实例。由于您的可迭代对象是元组(chunks, fob),这将运行两个任务,在一个任务中调用run(chunks),在另一个任务中调用run(fob)


    我认为您想要做的是为chunks 中的每个chunk 运行一个任务,调用run(chunk, fob)

    因此,首先,您需要一个迭代器,每个块产生一次 (chunk, fob),例如,((chunk, fob) for chunk in chunks)


    但这仍然行不通,因为它将使用单个参数调用 run,即 2 元组 (chunk, fob),而不是使用两个参数。您可以通过重写或包装 run 来解决此问题,以采用单个 2 元组而不是两个单独的参数,或者您可以只使用 starmap 而不是 map,它会为您完成包装。


    但这仍然行不通。你试图在进程之间传递一个打开的文件对象,而multiprocessing 不能这样做。

    由于您使用fork 方法,有时您可以从父级继承文件对象而不是传递它,但细节很复杂,您确实需要阅读Programming guidelines for @ 987654341@ 并了解文件描述符继承在 Unix 上的工作原理。

    由于您希望每个孩子都拥有自己独立的文件对象副本,以便他们都可以在其中使用seek,因此最简单的解决方案是只传递文件名并让他们自己拥有open

    def run(tup, path):
        with open(path) as fob:
            fob.seek(tup[0])
            length=int(tup[1])-int(tup[0])
            lines=fob.readlines(length)
            for line in lines:
                print(line)
    
    fob = open(infile)
    chunks = [x for x in chunker(fob)]
    args = ((chunk, infile) for chunk in chunks)
    pool.starmap(run, args)
    

    同时,既然我们确定我们不依赖fork 行为,编写代码以使用任何启动方法可能是个好主意。这意味着将顶级代码放入__main__ 块中。而且,在我们处理它的同时,让我们确保在完成文件后关闭它:

    # imports
    # function definitions
    if __name__ == '__main__':
        infile = argv[1]
        pool = Pool(15)
        with open(infile) as fob:
            chunks = [x for x in chunker(fob)]
        args = ((chunk, infile) for chunk in chunks)
        pool.starmap(run, args)
    

    您的代码中可能还有其他错误,但我认为这会耗尽multiprocessing 的错误。

    【讨论】:

    • 哇,我刚刚经历了这个。但是您先生/女士是个天才,而且也以光速...
    • 完美的位置。我的另一个查询是关于如何将结果返回到对每个进程都有贡献的全局共享列表。例如最好简单地声明全局变量,然后在每次运行中追加......还是 mp 无法处理?
    • @LewisMacLachlan 如果可能的话,理想的解决方案是让子任务返回结果,或者将它们传递给队列,并让父任务建立列表。如果没有,如果您可以使用比任意 Python 对象列表更低级别的东西,例如整数数组或复杂的 numpy 数组或 ctypes 结构,请使用共享内存和锁。如果没有,您通常使用 Manager 对象,虽然效率很低,但至少它非常容易编写(除非您遇到不常见但不太罕见的微妙同步问题)。
    • @LewisMacLachlan 如果你通读了多处理模块文档(我知道它很大,但它的组织方式你几乎必须至少直接阅读前半部分,然后搜索下半部分的相关内容参考资料,至少是第一次……),它涵盖了所有选项以及它们之间的权衡。
    猜你喜欢
    • 2020-07-24
    • 2017-04-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-16
    • 2015-07-14
    • 2016-07-31
    相关资源
    最近更新 更多