【问题标题】:How do you parallelize a program to read and write large files in python?如何并行化程序以在 python 中读取和写入大文件?
【发布时间】:2019-11-25 19:34:35
【问题描述】:

我正在尝试使用 Python 从约 3 亿行和约 200 GB 的大文件中读取和写入数据。我已经能够让基本代码工作,但想并行化它以便它运行得更快。为此,我一直在遵循本指南:https://www.blopig.com/blog/2016/08/processing-large-files-using-python/。但是,当我尝试并行化代码时,我收到一个错误:“TypeError:* 之后的 worker() 参数必须是可迭代的,而不是 int”。我怎样才能让代码运行,你对提高效率有什么建议吗?请注意,我对 Python 比较陌生。

基本代码(其中 id_pct1 和 id_pct001 是集合):

with open(file1) as f, open('file1', 'w') as out_f1, open('file2', 'w') as out_f001:
        for line in f:
            data = line.split('*')
            if data[30] in id_pct1: out_f1.write(line)
            if data[30] in id_pct001: out_f001.write(line)

并行代码:

def worker(lineByte):
      with open(file1) as f, open('file1', 'w') as out_f1, open('file2', 'w') as out_f001:
            f.seek(lineByte)
            line = f.readline()
            data = line.split('*')
            if data[30] in id_pct1: out_f1.write(line)
            if data[30] in id_pct001: out_f001.write(line)


def main():
   pool = mp.Pool()
   jobs = []

   with open('Subsets/FirstLines.txt') as f:
        nextLineByte = 0
        for line in f:
            jobs.append(pool.apply_async(worker,(nextLineByte)))
            nextLineByte += len(line)

        for job in jobs:
            job.get()

        pool.close()

if __name__ == '__main__':
    main()

【问题讨论】:

标签: python multiprocessing


【解决方案1】:

试试

 jobs.append(pool.apply_async(worker,(nextLineByte,)))

pool.apply_async() 需要一个可迭代对象。

(nextLineByte) 作为一个int,就是抛出的错误。

【讨论】:

  • 谢谢!我也意识到我还有其他一些问题。当我还将代码更改为 nextLineByte += len(line)+1 并为每个 stackoverflow.com/questions/22147166/… 添加一个“侦听器”时,该代码有效
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-09-17
  • 1970-01-01
  • 1970-01-01
  • 2012-11-12
  • 1970-01-01
  • 2020-08-19
  • 2014-03-20
相关资源
最近更新 更多