【问题标题】:Writing to file in Pool multiprocessing (Python 2.7)在池多处理中写入文件(Python 2.7)
【发布时间】:2018-03-18 11:08:46
【问题描述】:

我正在做很多计算,将结果写入文件。使用多处理我正在尝试并行计算。

这里的问题是我正在写入一个所有工作人员也在写入的输出文件。我对多处理很陌生,想知道如何使它工作。

代码的一个非常简单的概念如下:

from multiprocessing import Pool

fout_=open('test'+'.txt','w')

def f(x):
    fout_.write(str(x) + "\n")


if __name__ == '__main__':
    p = Pool(5)
    p.map(f, [1, 2, 3])

我想要的结果是一个文件:

1 2 3

但是现在我得到一个空文件。有什么建议么? 我非常感谢任何帮助:)!

【问题讨论】:

  • 要么在访问文件时需要加锁,要么为每个工作人员使用一个文件,稍后依次汇总
  • 嗨,我已经尝试了您的代码,没有进行任何修改,我得到了一个文件test.txt,其中写入了1, 2, 3。我无法重现您的错误
  • 我正在使用 Python 3.6,如果这很重要的话
  • nj2237,很奇怪。我在 python 2.7 和 3.6 中都有问题(只有一个空文件)。
  • 带有 'w' 标志的每个工作人员都会覆盖文件...尝试将其更改为 'a' 标志。此外,您要么需要让每个工作人员写入唯一文件并稍后将它们组合起来,要么需要多进程计算然后写入单个文件。第三种选择是写入一些具有并发访问权限的数据库。

标签: python python-2.7 multiprocessing pool


【解决方案1】:

您不应该让所有工作人员/进程写入单个文件。它们都可以从一个文件中读取(由于工作人员等待其中一个完成读取,这可能会导致速度变慢),但是写入同一个文件会导致冲突和潜在的损坏。

就像在 cmets 中所说的那样,改为写入单独的文件,然后在一个进程中将它们组合成一个。这个小程序是根据你帖子里的程序来说明的:

from multiprocessing import Pool

def f(args):
    ''' Perform computation and write
    to separate file for each '''
    x = args[0]
    fname = args[1]
    with open(fname, 'w') as fout:
        fout.write(str(x) + "\n")

def fcombine(orig, dest):
    ''' Combine files with names in 
    orig into one file named dest '''
    with open(dest, 'w') as fout:
        for o in orig:
            with open(o, 'r') as fin:
                for line in fin:
                    fout.write(line)

if __name__ == '__main__':
    # Each sublist is a combination
    # of arguments - number and temporary output
    # file name
    x = range(1,4)
    names = ['temp_' + str(y) + '.txt' for y in x]
    args = list(zip(x,names))

    p = Pool(3)
    p.map(f, args)

    p.close()
    p.join()

    fcombine(names, 'final.txt')

它为每个参数组合运行f,在这种情况下是 x 的值和临时文件名。它使用参数组合的嵌套列表,因为pool.map 不接受多个参数。还有其他方法可以解决这个问题,尤其是在较新的 Python 版本上。

对于每个参数组合和池成员,它会创建一个单独的文件,并将输出写入该文件。原则上您的输出会更长,您可以简单地将另一个计算它的函数添加到 f 函数。此外,不需要将 Pool(5) 用于 3 个参数(尽管我假设无论如何只有三个工作人员处于活动状态)。

close()join() 的原因在 this 帖子中有很好的解释。事实证明(在对链接帖子的评论中)map 正在阻塞,所以在这里你不需要它们,因为最初的原因(等到它们全部完成,然后从一个进程写入组合输出文件) .如果以后添加其他并行功能,我仍然会使用它们。

在最后一步中,fcombine 将所有临时文件收集并复制到一个文件中。它有点太嵌套了,例如,如果您决定在复制后删除临时文件,您可能需要使用 with open('dest', ).. 下的单独函数或下面的 for 循环 - 以提高可读性和功能性。

【讨论】:

  • 非常感谢。此外,澄清对 .close() 和 .join() 项目的需求使我更好地理解了代码。祝你好运。
  • 很高兴它有帮助。前几天我使用了这种方法;)稍后会添加它,现在必须按字面意思进行。
  • 实际上已编辑 - 事实证明您不需要它们与地图。但就像编辑中所说的那样,我仍然会把它留在那里。引用代码中解释的原因是异常消息,但也是一种习惯,因为并行功能并不总是非阻塞的。另一方面,在带有 MPI 的 C/C++ 中,如果不需要,我永远不会使用障碍,我想在编码习惯方面我需要下定决心;)
【解决方案2】:

Multiprocessing.pool 生成进程,在没有锁定的情况下从每个进程写入公共文件可能会导致数据丢失。 正如您所说,您正在尝试并行化计算,multiprocessing.pool 可用于并行化计算。

以下是做并行计算并将结果写入文件的解决方案,希望对您有所帮助:

from multiprocessing import Pool

# library for time 
import datetime

# file in which you want to write 
fout = open('test.txt', 'wb')

# function for your calculations, i have tried it to make time consuming
def calc(x):
    x = x**2
    sum = 0
    for i in range(0, 1000000):
        sum += i
    return x

# function to write in txt file, it takes list of item to write
def f(res):
    global fout
    for x in res:
        fout.write(str(x) + "\n")

if __name__ == '__main__':
    qs = datetime.datetime.now()
    arr = [1, 2, 3, 4, 5, 6, 7]
    p = Pool(5)
    res = p.map(calc, arr)
    # write the calculated list in file
    f(res)
    qe = datetime.datetime.now()
    print (qe-qs).total_seconds()*1000
    # to compare the improvement using multiprocessing, iterative solution
    qs = datetime.datetime.now()
    for item in arr:
        x = calc(item)
        fout.write(str(x)+"\n")
    qe = datetime.datetime.now()
    print (qe-qs).total_seconds()*1000

【讨论】:

  • 感谢 vsri293 的解释!这是非常明确的,但是由于广泛的解释,我采用了用户 atru 的解决方案。希望你能理解!
  • 感谢@TimD,atru 的解决方案很好,但 atru 的解决方案假设池进程的数量和您需要进行计算的数量可以相同。如果您只能合并 5 个进程并且必须计算 100 个数字(超过 5 个)怎么办?您必须编写额外的代码来压缩数字以输出文件名。
猜你喜欢
  • 1970-01-01
  • 2018-06-11
  • 2014-12-23
  • 2012-11-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-24
  • 2020-08-14
相关资源
最近更新 更多