【问题标题】:Multithreading while reading and processing a huge file (too big for memory)读取和处理大文件时的多线程(内存太大)
【发布时间】:2019-06-29 04:09:42
【问题描述】:

我的以下代码运行速度非常慢。这是一个拆分大文件(80 gig)并将其放入树形文件夹结构以便快速查找的程序。我在代码中做了几个cmets来帮助大家理解。

# Libraries
import os


# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth


# Preperations
os.makedirs(outputdirectory)

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()


# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
    for line in infile:
        pipeline(line)

有没有办法让多线程工作?因为我自己尝试了一些我在网上找到的示例,它会将所有内容都放入内存中,导致我的计算机多次冻结。

【问题讨论】:

  • 因为瓶颈是 HDD 访问,所以不要指望并行化会大大加快速度(如果在您的系统中以某种方式实现并行文件访问,您可能会有所收获,但因为占用的不是 CPU,添加更多内核无济于事)
  • 只有一个核心使用率 100%,根据系统监控,我的磁盘使用率低于 4%。我有一个 NVMe SSD,所以我真的认为多核可能还有改进的空间。
  • 那听起来很有希望。大文件是否需要保持这种状态,或者您可以将其分成几块?如果分成块,并行化会容易得多
  • 大文件可能会被预处理并分割成块。我不熟悉使用块,所以如果你能指导我看几个例子,我可以研究如何解决这个问题。我在看blopig.com/blog/2016/08/processing-large-files-using-python,但不知何故最后一个代码块给了我ValueError: I/O operation on closed file.

标签: python python-3.x multithreading queue python-multithreading


【解决方案1】:

首先,(IMO)最简单的解决方案

如果看起来这些行是完全独立的,只需将文件拆分为 N 个块,将要打开的文件名作为程序参数传递并运行当前脚本的多个实例,在多个命令行上手动启动它们。

优点:

  • 无需深入研究多处理、进程间通信等
  • 不需要过多修改代码

缺点:

  • 您需要对大文件进行预处理,将其拆分为多个块(尽管这将比您当前的执行时间快得多,因为您不会遇到每行打开-关闭的场景)
  • 您需要自己启动进程,并为每个进程传递适当的文件名

这将被实现为:

预处理:

APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
  chunk_id = 0
  next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
  while next_chunk:
    with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
      ofp.writelines(next_chunk)
    chunk_id += 1
    next_chunk = fp.readlines(APPROX_CHUNK_SIZE)

来自readlines docs

如果存在可选的 sizehint 参数,而不是读取到 EOF,而是读取总计大约 sizehint 字节的整行(可能在四舍五入到内部缓冲区大小之后)。

这样做并不能确保所有块中的行数都是偶数,但会使预处理速度更快,因为您是在块中读取而不是逐行读取。根据需要调整块大小。 另外,请注意,通过使用readlines,我们可以确保块之间不会出现断行,但由于函数返回行列表,我们使用writelines 将其写入输出文件(相当于循环遍历列表和ofp.write(line))。为了完整起见,请注意,您还可以连接内存中的所有字符串并只调用一次write(即,执行ofp.write(''.join(next_chunk))),这可能会为您带来一些(次要)性能优势,支付(多) 更高的 RAM 使用率。

主脚本:

您需要的唯一修改位于最顶部:

import sys
file=sys.argv[1]
... # rest of your script here

通过使用argv,您可以将命令行参数传递给您的程序(在这种情况下,是要打开的文件)。然后,只需将您的脚本运行为:

python process_the_file.py big_file_0.txt

这将运行一个进程。打开多个终端并使用big_file_N.txt 为每个终端运行相同的命令,它们将相互独立。

注意:我使用argv[1],因为对于所有程序,argv 的第一个值(即argv[0])始终是程序名称。


那么,multiprocessing 解决方案

虽然有效,但第一个解决方案不是很优雅,尤其是如果从 80GB 大小的文件开始,您将拥有 80 个文件。

更简洁的解决方案是使用 python 的 multiprocessing 模块(重要:不是 threading!如果您不知道区别,请查找“全局解释器锁”以及为什么 python 中的多线程不起作用你认为它会的方式)。

这个想法是有一个“生产者”进程来打开大文件并不断地将其中的行放入队列中。然后,“消费者”进程池从队列中提取行并进行处理。

优点:

  • 一个脚本完成所有工作
  • 无需打开多个终端打字

缺点:

  • 复杂性
  • 使用进程间通信,有一些开销

这将按如下方式实现:

# Libraries
import os
import multiprocessing

outputdirectory="sorted"
depth=4 # This is the tree depth

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()

if __name__ == '__main__':
    # Variables
    file="80_gig_file.txt"

    # Preperations
    os.makedirs(outputdirectory)
    pool = multiprocessing.Pool() # by default, 1 process per CPU
    LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM

    with open(file) as infile:
        next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
        pool.close()
        pool.join()

if __name__ == '__main__' 行是一个障碍,用于将运行在每个进程上的代码与仅运行在“父进程”上的代码分开。每个进程都定义了pipeline,但只有父进程实际生成了一个工人池并应用了该函数。您可以找到有关multiprocessing.map here 的更多详细信息

编辑:

增加了关闭和加入池,以防止主进程退出并杀死进程中的子进程。

【讨论】:

  • 我非常感谢您为这个答案付出的时间和精力。这真的很好解释和正确呈现。我试图在我的文本文件上运行代码,但不知何故它立即完成并且排序文件夹为空。你确定你打开文件的最后几行是正确的吗?我认为它会尝试将所有内容加载到内存中并立即退出。
  • 那是我的错,我用imap而不是map。不同之处在于,imap 返回一个可迭代对象,并且仅在迭代器请求下一个元素时才懒惰地评估这些值。由于代码从未要求下一个元素,因此什么也没有发生。用map 替换它是可行的方法(它会检查我刚刚运行的快速测试),让我知道这是否更好!
  • 仅供参考,还有一个writelines
  • @MarkTolonen 谢谢!我不知道 :) 我会更新代码以使用它!
  • @GPhilo,如果您将print(line) 放在def pipeline(line): 的正下方,出于调试目的,您可以看到文件名“80_gig_file.txt”逐字打印。它不会打印应该预期的行。如果我将 pool.map(pipeline, file, LINES_PER_PROCESS) 更改为 pool.map(pipeline, infile, LINES_PER_PROCESS) ,所有内容都会加载到内存中,我必须在计算机死机之前将其杀死。
猜你喜欢
  • 1970-01-01
  • 2020-01-26
  • 2017-11-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多