【问题标题】:Splitting a CSV file into equal parts?将 CSV 文件分成相等的部分?
【发布时间】:2015-06-19 21:41:43
【问题描述】:

我有一个大的 CSV 文件,我想将其拆分为一个等于系统中 CPU 内核数的数字。然后我想使用多进程让所有核心一起处理文件。但是,我什至无法将文件分成几部分。我查看了整个谷歌,发现一些示例代码似乎可以满足我的需求。这是我目前所拥有的:

def split(infilename, num_cpus=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    total_file_size = os.path.getsize(infilename)
    print total_file_size
    files = list()
    with open(infilename, 'rb') as infile:
        for i in xrange(num_cpus):
            files.append(tempfile.TemporaryFile())
            this_file_size = 0
            while this_file_size < 1.0 * total_file_size / num_cpus:
                files[-1].write(infile.read(READ_BUFFER))
                this_file_size += READ_BUFFER
        files[-1].write(infile.readline()) # get the possible remainder
        files[-1].seek(0, 0)
    return files

files = split("sample_simple.csv")
print len(files)

for ifile in files:
    reader = csv.reader(ifile)
    for row in reader:
        print row

这两个打印显示了正确的文件大小,并且它被分成了 4 个部分(我的系统有 4 个 CPU 内核)。

但是,打印每个片段中所有行的代码的最后一部分给出了错误:

for row in reader:
_csv.Error: line contains NULL byte

我尝试在不运行拆分功能的情况下打印行,它正确打印了所有值。我怀疑 split 函数在生成的 4 个文件片段中添加了一些 NULL 字节,但我不知道为什么。

有谁知道这是否是一种正确且快速的文件分割方法?我只想要 csv.reader 可以成功读取的结果片段。

【问题讨论】:

  • 您的文件中有空字节吗?用 repr 打印行
  • 我可以假设没有,因为打印原始文件的行而不拆分是成功的吗?
  • 一个简单的方法是获取行数并将文件分成 n 片
  • 您不能在任意点拆分 csv 文件,文件格式是面向行的,因此任何拆分都必须发生在行之间的边界处 — 这意味着您知道它们在哪里.
  • 您确实要求拆分 CSV 文件并且已经有了答案。但是,您还给出了使用所有 CPU 内核的理由。有两点。您应该检查文件 I/O 或数字运算是否是您的瓶颈。你知道global interpreter lock

标签: python csv split null byte


【解决方案1】:

正如我在评论中所说,csv 文件需要在行(或行)边界上拆分。您的代码没有这样做,并且可能会在其中的某个地方将它们分解 - 我怀疑这是您的 _csv.Error 的原因。

下面通过将输入文件处理为一系列行来避免这样做。我已经对其进行了测试,它似乎可以独立工作,因为它将示例文件分成 大约 大小相等的块,因为整个行数不太可能完全适合一个块。

更新

这是一个大大比我最初发布的代码更快的版本。改进是因为它现在使用临时文件自己的tell() 方法来确定正在写入的文件的不断变化的长度,而不是调用os.path.getsize(),这消除了flush() 文件和调用os.fsync() 的需要写完每一行后在上面。

import csv
import multiprocessing
import os
import tempfile

def split(infilename, num_chunks=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    in_file_size = os.path.getsize(infilename)
    print 'in_file_size:', in_file_size
    chunk_size = in_file_size // num_chunks
    print 'target chunk_size:', chunk_size
    files = []
    with open(infilename, 'rb', READ_BUFFER) as infile:
        for _ in xrange(num_chunks):
            temp_file = tempfile.TemporaryFile()
            while temp_file.tell() < chunk_size:
                try:
                    temp_file.write(infile.next())
                except StopIteration:  # end of infile
                    break
            temp_file.seek(0)  # rewind
            files.append(temp_file)
    return files

files = split("sample_simple.csv", num_chunks=4)
print 'number of files created: {}'.format(len(files))

for i, ifile in enumerate(files, start=1):
    print 'size of temp file {}: {}'.format(i, os.path.getsize(ifile.name))
    print 'contents of file {}:'.format(i)
    reader = csv.reader(ifile)
    for row in reader:
        print row
    print ''

【讨论】:

  • 感谢您的帮助。这段代码确实有效,但在一个 130MB 的文件上,它花了将近 20 分钟。我经常处理高达 50GB 的文件。有没有办法让它更有效率?似乎有很多硬盘访问。
  • @Colin:文件的分割本质上是一个耗时的过程,因为它至少涉及读取和写入整个文件的数据。当我根据文档添加os.fsync() 时,速度明显放缓,即使它似乎在我的系统上没有它也能正常工作。如果可以接受的临时文件的大小较小,则可以每隔一个或每三行比较大小。另一种方法是从数学上精确的分割点开始,然后通过从该位置向前读取到最近的换行符来调整每个分割点。
  • @Colin:你真的需要物理拆分文件吗?很可能有多个进程同时读取同一个文件。
  • 谢谢,现在速度快多了。更新代码大约需要 2.5 秒。我不关心每个块的确切大小,只要它们大致相同。最初我以为我会拆分文件,以便将它们提供给每个 CPU 内核。我的目标是分析 csv 文件并将某些列相乘,然后找到它的移动平均值。如果我同时使用 4 个内核访问同一个文件,是否会影响性能?
  • 我不认为我之前的建议不检查每一行的大小会对当前代码产生太大影响,我怀疑它现在是 I/O 绑定的。解决这个问题的唯一方法是摆脱它,这就是为什么我建议让每个进程访问同一个文件,我认为这样做本身不会存在重大的性能问题。跟踪每个进程应该限制自己处理的文件的哪一部分仍然会产生一些开销——这意味着您仍然需要知道相对于每个文件“块”而言行边界位于何处。
猜你喜欢
  • 2019-04-01
  • 1970-01-01
  • 1970-01-01
  • 2011-07-19
  • 2015-05-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多