【问题标题】:Ensuring correct order when collecting results from multiprocessing从多处理收集结果时确保正确的顺序
【发布时间】:2014-02-26 10:15:24
【问题描述】:

我有一个多处理脚本,它像这样遍历字典:

jobs = []
for key, val in datadict.items():
    jobs.append(pool.apply_async(worker, (val))

pool.close()
pool.join()

jobs 是结果对象的列表,(调用get() 将给出数据列表)

我想对结果进行格式化,使它们成为与输入字典具有相同键和顺序的字典。

我想在所有工作完成后简单地这样做:

result = {key: jobs[key].get() for key, val in datadict} 

这很有效,因为datadict 中的键是整数(因此可用于索引作业列表)。 但后来我突然想到,结果工作列表可能不一定按照相同的顺序(因为它是创建的)——这是真的吗? (我希望订单可能会变得混乱,因为一个过程可能比另一个过程完成得更快,等等)

所以我决定将datadictkey 传递给worker 函数,然后将结果作为元组再次返回。这样调用jobs[index].get() 将返回一个元组,其中第一个值是键(刚刚通过函数),第二个值是实际结果

然后我可以像这样创建一个字典:

result = dict([job.get() for job in jobs])

所以最终的脚本是:

def worker(val, key):        
    res = "Do something to val"
    return (key, res)

if __name__ == "__main__":
    jobs = []
    for key, val in datadict.items():
        jobs.append(pool.apply_async(worker, (val, key))

    pool.close()
    pool.join()

    result = dict([job.get() for job in jobs])

但这是最好的方法吗?有几点让我很烦恼:

  1. 到目前为止,jobs 列表的结果顺序与输入顺序匹配
  2. “通过”函数传递值似乎有点愚蠢(即什么都不做)

【问题讨论】:

    标签: python python-2.7 dictionary multiprocessing


    【解决方案1】:

    显式排序 datadict 字典键,并对其进行迭代。

    import multiprocessing
    
    def worker(val):
        res = "Do something to val {}".format(val)
        return res
    
    if __name__ == "__main__":
        datadict = {1: 'val1', 2: 'val2', 0: 'val0'}
        jobs = []
        pool = multiprocessing.Pool()
        for key in sorted(datadict): # <------------
            jobs.append(pool.apply_async(worker, (datadict[key],)))
        pool.close()
        pool.join()
        result = [job.get() for job in jobs]
        print(result)
        # ['Do something to val 0', 'Do something to val 1', 'Do something to val 2']
    

    顺便说一句,如果worker 只接受一个参数,你可以使用Pool.map

    if __name__ == "__main__":
        datadict = {1: 'val1', 2: 'val2', 0: 'val0'}
        jobs = []
        pool = multiprocessing.Pool()
        result = pool.map(worker, sorted(datadict)) # <---
        pool.close()
        pool.join()
    

    【讨论】:

      【解决方案2】:

      dict 容器不保证任何特定的顺序。如果您想让事情井井有条,您需要将结果存储在list

      result = [job.get() for job in jobs]
      

      或者您可以使用维护插入顺序的OrderedDict

      result = OrderedDict([job.key, job.get() for job in jobs])
      

      第二种解决方案需要一种从作业中获取密钥的方法。

      更新:

      如果 order 是 key 给出的,那么你可以通过这个属性对结果进行排序(job 需要知道它的 key):

      results = [job.get() for job in jobs]
      results = sorted(results, key=attrgetter('key'))
      

      或者如果你需要dict:

      results = [job.get() for job in jobs]
      results = OrderedDict([job.key, job for job in sorted(results, key=attrgetter('key'))])
      

      【讨论】:

      • 顺序由键给出...也许“顺序”是错误的词。我需要将正确的结果分配给正确的键。
      【解决方案3】:

      作业可能会乱序完成,但这不会改变您的jobs 列表的顺序。但是,您通过循环 datadict.items() 来填充 jobs;这将它们置于任意顺序,因为字典不保持顺序。

      将密钥放入jobs 将是做你想做的事情的一种方法:

      jobs = []
      for key, val in datadict.items():
          jobs.append((key, pool.apply_async(worker, (val,)))
      
      pool.close()
      pool.join()
      
      result = {key: job.get() for key, job in jobs} 
      

      【讨论】:

      • 这是如何工作的?假设我的字典中有两个项目,如果第二个项目先完成,这不会在第一个项目之前附加到工作列表中(因此在错误的位置)?
      • 您在for 循环中自己将结果对象附加到jobs。之后没有重新排列它们。在结果对象上调用get 会等待实际结果到达,但不会更改列表的顺序。
      猜你喜欢
      • 1970-01-01
      • 2013-08-09
      • 2021-11-19
      • 1970-01-01
      • 2015-08-02
      • 1970-01-01
      • 2014-11-28
      • 2020-01-05
      • 1970-01-01
      相关资源
      最近更新 更多