【问题标题】:Reading a huge .csv file读取一个巨大的 .csv 文件
【发布时间】:2021-05-29 08:49:03
【问题描述】:

我目前正在尝试从 Python 2.7 中的 .csv 文件中读取数据,该文件最多有 100 万行和 200 列(文件范围从 100mb 到 1.6gb)。我可以对 300,000 行以下的文件执行此操作(非常缓慢),但是一旦超过此值,我就会出现内存错误。我的代码如下所示:

def getdata(filename, criteria):
    data=[]
    for criterion in criteria:
        data.append(getstuff(filename, criteron))
    return data

def getstuff(filename, criterion):
    import csv
    data=[]
    with open(filename, "rb") as csvfile:
        datareader=csv.reader(csvfile)
        for row in datareader: 
            if row[3]=="column header":
                data.append(row)
            elif len(data)<2 and row[3]!=criterion:
                pass
            elif row[3]==criterion:
                data.append(row)
            else:
                return data

getstuff 函数中的 else 子句的原因是所有符合条件的元素都会一起列在 csv 文件中,所以当我通过它们时我会离开循环以节省时间。

我的问题是:

  1. 我怎样才能让它与更大的文件一起使用?

  2. 有什么方法可以让它更快?

我的电脑有 8GB RAM,运行 64 位 Windows 7,处理器为 3.40 GHz(不确定您需要什么信息)。

【问题讨论】:

  • 我知道有几个看似相似的问题,但似乎没有一个对我的问题有足够的具体帮助。抱歉,如果我错过了一个。
  • 您应该将读取的数据存储在数据库中(例如 Sqlite),而不是将其保存在内存中。然后,您可以运行进一步的处理,例如在 db 上进行过滤

标签: python python-2.7 file csv


【解决方案1】:

您正在将所有行读入一个列表,然后处理该列表。 不要这样做

在生成行时对其进行处理。如果您需要先过滤数据,请使用生成器函数:

import csv

def getstuff(filename, criterion):
    with open(filename, "rb") as csvfile:
        datareader = csv.reader(csvfile)
        yield next(datareader)  # yield the header row
        count = 0
        for row in datareader:
            if row[3] == criterion:
                yield row
                count += 1
            elif count:
                # done when having read a consecutive series of rows 
                return

我还简化了您的过滤器测试;逻辑相同,但更简洁。

因为您只匹配符合条件的单个行序列,您也可以使用:

import csv
from itertools import dropwhile, takewhile

def getstuff(filename, criterion):
    with open(filename, "rb") as csvfile:
        datareader = csv.reader(csvfile)
        yield next(datareader)  # yield the header row
        # first row, plus any subsequent rows that match, then stop
        # reading altogether
        # Python 2: use `for row in takewhile(...): yield row` instead
        # instead of `yield from takewhile(...)`.
        yield from takewhile(
            lambda r: r[3] == criterion,
            dropwhile(lambda r: r[3] != criterion, datareader))
        return

您现在可以直接循环 getstuff()。在getdata()中做同样的事情:

def getdata(filename, criteria):
    for criterion in criteria:
        for row in getstuff(filename, criterion):
            yield row

现在在您的代码中直接循环 getdata()

for row in getdata(somefilename, sequence_of_criteria):
    # process row

您现在只在内存中保存 一行,而不是每个标准都有数千行。

yield 使函数成为 generator function,这意味着在您开始循环之前它不会做任何工作。

【讨论】:

  • 在使用这种技术和csv.DictReader 时,您是否获得相同的内存效率?因为我对 2.5GB .csv 文件的测试表明,在使用它而不是 csv.reader 时尝试像这样逐行迭代会导致 Python 进程增长到完整的 2.5GB 内存使用量。
  • @user5359531 这表明您在某处保留对字典对象的引用。 DictReader 本身 doesn’t retain references 所以问题出在其他地方。
【解决方案2】:

尽管Martijin 的回答可能是最好的。这是为初学者处理大型 csv 文件的一种更直观的方法。这允许您一次处理多组行或块。

import pandas as pd
chunksize = 10 ** 8
for chunk in pd.read_csv(filename, chunksize=chunksize):
    process(chunk)

【讨论】:

  • 为什么使用 pandas 会更直观?
  • 4 行代码对于像我这样的新手来说总是更好。
  • 常规的 Python 代码同样简短,可以让您按行处理。生成器功能仅用于过滤内容;你会如何在 Pandas 中进行同样的过滤?
  • 这太棒了!解决了我使用 pandas 加载和处理大型 csv 文件的问题。谢谢!
  • 即使某些行的内容跨越多行,它也能很好地工作!
【解决方案3】:

我进行了大量的振动分析并查看大型数据集(数千万和数亿点)。我的测试表明 pandas.read_csv() 函数比 numpy.genfromtxt() 快 20 倍。 genfromtxt() 函数比 numpy.loadtxt() 快 3 倍。对于大型数据集,您似乎需要 pandas。

我在讨论 MATLAB vs Python for vibration analysis 的博客上发布了我在此测试中使用的代码和数据集。

【讨论】:

  • OP 的主要问题不是速度问题,而是内存耗尽问题之一。使用不同的函数来处理文件本身并不能消除将其读入列表而不是使用流处理器的缺点。
【解决方案4】:

对于遇到这个问题的人。将pandas 与“chunksize”和“usecols”一起使用帮助我比其他建议的选项更快地读取一个巨大的 zip 文件。

import pandas as pd

sample_cols_to_keep =['col_1', 'col_2', 'col_3', 'col_4','col_5']

# First setup dataframe iterator, ‘usecols’ parameter filters the columns, and 'chunksize' sets the number of rows per chunk in the csv. (you can change these parameters as you wish)
df_iter = pd.read_csv('../data/huge_csv_file.csv.gz', compression='gzip', chunksize=20000, usecols=sample_cols_to_keep) 

# this list will store the filtered dataframes for later concatenation 
df_lst = [] 

# Iterate over the file based on the criteria and append to the list
for df_ in df_iter: 
        tmp_df = (df_.rename(columns={col: col.lower() for col in df_.columns}) # filter eg. rows where 'col_1' value grater than one
                                  .pipe(lambda x:  x[x.col_1 > 0] ))
        df_lst += [tmp_df.copy()] 

# And finally combine filtered df_lst into the final lareger output say 'df_final' dataframe 
df_final = pd.concat(df_lst)

【讨论】:

    【解决方案5】:

    对我有用的东西是超快的

    import pandas as pd
    import dask.dataframe as dd
    import time
    t=time.clock()
    df_train = dd.read_csv('../data/train.csv', usecols=[col1, col2])
    df_train=df_train.compute()
    print("load train: " , time.clock()-t)
    

    另一个可行的解决方案是:

    import pandas as pd 
    from tqdm import tqdm
    
    PATH = '../data/train.csv'
    chunksize = 500000 
    traintypes = {
    'col1':'category',
    'col2':'str'}
    
    cols = list(traintypes.keys())
    
    df_list = [] # list to hold the batch dataframe
    
    for df_chunk in tqdm(pd.read_csv(PATH, usecols=cols, dtype=traintypes, chunksize=chunksize)):
        # Can process each chunk of dataframe here
        # clean_data(), feature_engineer(),fit()
    
        # Alternatively, append the chunk to list and merge all
        df_list.append(df_chunk) 
    
    # Merge all dataframes into one dataframe
    X = pd.concat(df_list)
    
    # Delete the dataframe list to release memory
    del df_list
    del df_chunk
    

    【讨论】:

    • 第一个解决方案中的df_train=df_train.compute() 行不会将整个数据集加载到内存中...这是他试图不做的事情吗?
    • time.clock() 在 Python 3.3 中已被弃用,将从 Python 3.8 中删除:请改用 time.perf_counter()time.process_time()
    【解决方案6】:

    这是 Python3 的另一种解决方案:

    import csv
    with open(filename, "r") as csvfile:
        datareader = csv.reader(csvfile)
        count = 0
        for row in datareader:
            if row[3] in ("column header", criterion):
                doSomething(row)
                count += 1
            elif count > 2:
                break
    

    这里datareader是一个生成器函数。

    【讨论】:

    • 所以,这与使用 yield 运算符的解决方案一样有效。:抱歉,它没有。回调函数调用增加了更多开销,尤其是因为您必须显式地单独处理状态。
    • @MartijnPieters 谢谢。更新了答案。
    【解决方案7】:

    如果您使用的是 pandas 并且有大量 RAM(足以将整个文件读入内存),请尝试使用 pd.read_csvlow_memory=False,例如:

    import pandas as pd
    data = pd.read_csv('file.csv', low_memory=False)
    

    【讨论】:

      猜你喜欢
      • 2016-11-26
      • 1970-01-01
      • 2020-08-07
      • 1970-01-01
      • 2020-08-05
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多