【问题标题】:Python MemoryError - Is there a more efficient way of working with my huge CSV file?Python MemoryError - 是否有更有效的方式来处理我的巨大 CSV 文件?
【发布时间】:2013-07-04 14:47:56
【问题描述】:

[使用 Python3.3] 我有一个巨大的 CSV 文件,其中包含 XX 百万行并包含几列。我想读取该文件,添加几个计算列并吐出几个“分段”csv 文件。我在以下代码上尝试了一个较小的测试文件,它完全符合我的要求。但是现在我正在加载原始的 CSV 文件(大约 3.2 GB)并且我得到一个内存错误。有没有更节省内存的方法来编写下面的代码?

请注意,我对 Python 很陌生,因此可能有很多我不完全了解的东西。

输入数据示例:

email               cc  nr_of_transactions  last_transaction_date   timebucket  total_basket
email1@email.com    us  2                   datetime value          1           20.29
email2@email.com    gb  3                   datetime value          2           50.84
email3@email.com    ca  5                   datetime value          3           119.12
...                 ... ...                 ...                     ...         ...

这是我的代码:

import csv
import scipy.stats as stats
import itertools
from operator import itemgetter


def add_rankperc(filename):
    '''
    Function that calculates percentile rank of total basket value of a user (i.e. email) within a country. Next, it assigns the user to a rankbucket based on its percentile rank, using the following rules:
     Percentage rank between 75 and 100 -> top25
     Percentage rank between 25 and 74  -> mid50
     Percentage rank between 0 and 24   -> bottom25
    '''

    # Defining headers for ease of use/DictReader
    headers = ['email', 'cc', 'nr_transactions', 'last_transaction_date', 'timebucket', 'total_basket']
    groups = []

    with open(filename, encoding='utf-8', mode='r') as f_in:
        # Input file is tab-separated, hence dialect='excel-tab'
        r = csv.DictReader(f_in, dialect='excel-tab', fieldnames=headers)
        # DictReader reads all dict values as strings, converting total_basket to a float
        dict_list = []
        for row in r:
            row['total_basket'] = float(row['total_basket'])
            # Append row to a list (of dictionaries) for further processing
            dict_list.append(row)

    # Groupby function on cc and total_basket
    for key, group in itertools.groupby(sorted(dict_list, key=itemgetter('cc', 'total_basket')), key=itemgetter('cc')):
        rows = list(group)
        for row in rows:
            # Calculates the percentile rank for each value for each country
            row['rankperc'] = stats.percentileofscore([row['total_basket'] for row in rows], row['total_basket'])
            # Percentage rank between 75 and 100 -> top25
            if 75 <= row['rankperc'] <= 100:
                row['rankbucket'] = 'top25'
            # Percentage rank between 25 and 74 -> mid50
            elif 25 <= row['rankperc'] < 75:
                row['rankbucket'] = 'mid50'
            # Percentage rank between 0 and 24 -> bottom25
            else:
                row['rankbucket'] = 'bottom25'
            # Appending all rows to a list to be able to return it and use it in another function
            groups.append(row)
    return groups


def filter_n_write(data):
    '''
    Function takes input data, groups by specified keys and outputs only the e-mail addresses to csv files as per the respective grouping.
    '''

    # Creating group iterator based on keys
    for key, group in itertools.groupby(sorted(data, key=itemgetter('timebucket', 'rankbucket')), key=itemgetter('timebucket', 'rankbucket')):
        # List comprehension to create a list of lists of email addresses. One row corresponds to the respective combination of grouping keys.
        emails = list([row['email'] for row in group])
        # Dynamically naming output file based on grouping keys
        f_out = 'output-{}-{}.csv'.format(key[0], key[1])
        with open(f_out, encoding='utf-8', mode='w') as fout:
            w = csv.writer(fout, dialect='excel', lineterminator='\n')
            # Writerows using list comprehension to write each email in emails iterator (i.e. one address per row). Wrapping email in brackets to write full address in one cell.
            w.writerows([email] for email in emails)

filter_n_write(add_rankperc('infile.tsv'))

提前致谢!

【问题讨论】:

  • “我有一个巨大的 CSV 文件,其中包含大约 4600 万行和几列” .... 为什么?这是关于存储数据效率最低的方法...您应该切换数据存储方法,而不是尝试让 CSV 为您工作...为什么不尝试一些 SQL 呢? (或任何其他实际使用数据库或存储方法意味着用于存储大量数据 - 不像 csv 文件)
  • 因为这是一个 csv 是从数据库系统导出的。为什么我要编写 python 脚本是因为“分组”并将输出写入多个 csv 文件。没错,我可以在数据库系统中执行此操作,但它需要我下载每个电子邮件地址列表,最多可能是 180 个 csv 文件。因此,我考虑编写一个脚本来为我完成这项工作。这样更有意义吗?
  • 为什么不用Python直接与数据库交互呢?然后只需准确提取您需要的内容,并以最有效的方式创建您想要的输出/结果文件..

标签: python memory csv python-3.x


【解决方案1】:

pandas 库 (http://pandas.pydata.org/) 具有非常好的快速 CSV 读取功能 (http://pandas.pydata.org/pandas-docs/stable/io.html#io-read-csv-table)。作为额外的奖励,您将数据作为 numpy 数组,使得计算百分位数变得非常容易。 这个question 讨论了用 pandas 分块读取大型 CSV。

【讨论】:

  • 嗨罗伯特,我听说过熊猫,但没有经验。将研究它,希望它会有意义。同时,对于当前问题的任何其他帮助将不胜感激。
【解决方案2】:

我同意 Inbar Rose 的观点,即最好使用数据库函数 攻击这个问题。假设我们需要回答你提出的问题, 不过 - 我认为我们可以,但会牺牲速度。

您可能在构建所有行的列表时内存不足' 字典。我们可以通过只考虑行的一个子集来解决这个问题 一次。

这是我的第一步代码 - 大致是您的 add_rankperc 函数:

import csv
from scipy.stats import percentileofscore
from operator import itemgetter

# Run through the whole file once, saving each row to a file corresponding to
# its 'cc' column
cc_dict = {}
with open(input_path, encoding="utf-8", mode='r') as infile:
  csv_reader = csv.reader(infile, dialect="excel-tab")
  for row in csv_reader:
    cc = row[1]
    if cc not in cc_dict:
      intermediate_path = "intermediate_cc_{}.txt".format(cc)
      outfile = open(intermediate_path, mode='w', newline='')
      csv_writer = csv.writer(outfile)
      cc_dict[cc] = (intermediate_path, outfile, csv_writer)
    _ = cc_dict[cc][2].writerow(row)

# Close the output files
for cc in cc_dict.keys():
  cc_dict[cc][1].close()

# Run through the whole file once for each 'cc' value
for cc in cc_dict.keys():
  intermediate_path = cc_dict[cc][0]
  with open(intermediate_path, mode='r', newline='') as infile:
    csv_reader = csv.reader(infile)
    # Pick out all of the rows with the 'cc' value under consideration
    group = [row for row in csv_reader if row[1] == cc]
    # Get the 'total_basket' values for the group
    A_scores = [float(row[5]) for row in group]
    for row in group:
      # Compute this row's 'total_basket' score based on the rest of the
      # group's
      p = percentileofscore(A_scores, float(row[5]))
      row.append(p)
      # Categorize the score
      bucket = ("bottom25" if p < 25 else ("mid50" if p < 75 else "top100"))
      row.append(bucket)
  # Save the augmented rows to an intermediate file
  with open(output_path, mode='a', newline='') as outfile:
    csv_writer = csv.writer(outfile)
    csv_writer.writerows(group)

4600 万行很多,所以这可能会很慢。我避免使用 DictReader 模块的 csv 功能,只是对行进行了索引 直接避免这种开销。我还计算了第一个参数 percentileofscores 每组一次,而不是组中的每一行。

如果这可行,那么我认为您可以对filter_n_write 遵循相同的想法 函数 - 运行一次生成的中间文件,挑选出 (timebucket, rank) 对。然后再次浏览中间文件,一次 每一对。

【讨论】:

  • 嗨,博,您的代码正在运行,正在解决 MemoryError 问题。但是现在我又挖了一个,即脚本需要很长时间才能完成 - 到目前为止它已经运行了几个小时。研究其他解决方案。
  • 我编辑了我的解决方案,以避免为每个 'cc' 值旋转所有 46M 行,而是使用一堆中间文件。这有帮助吗?
猜你喜欢
  • 2013-02-02
  • 2018-03-23
  • 2018-12-19
  • 1970-01-01
  • 2018-07-25
  • 1970-01-01
  • 2010-11-13
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多