【问题标题】:How do I read a large csv file with pandas?如何使用 pandas 读取大型 csv 文件?
【发布时间】:2014-11-15 17:06:29
【问题描述】:

我正在尝试在 pandas 中读取一个大型 csv 文件(大约 6 GB),但出现内存错误:

MemoryError                               Traceback (most recent call last)
<ipython-input-58-67a72687871b> in <module>()
----> 1 data=pd.read_csv('aphro.csv',sep=';')

...

MemoryError: 

有什么帮助吗?

【问题讨论】:

标签: python pandas csv memory chunks


【解决方案1】:

错误说明机器没有足够的内存来读取整个 一次将 CSV 转换为 DataFrame。假设您不需要整个数据集 一次全部内存,避免该问题的一种方法是process the CSV in chunks(通过指定chunksize参数):

chunksize = 10 ** 6
for chunk in pd.read_csv(filename, chunksize=chunksize):
    process(chunk)

chunksize 参数指定每个块的行数。 (当然,最后一个块可能包含少于chunksize 行。)


熊猫 >= 1.2

read_csvchunksize 返回一个上下文管理器,可以这样使用:

chunksize = 10 ** 6
with pd.read_csv(filename, chunksize=chunksize) as reader:
    for chunk in reader:
        process(chunk)

GH38225

【讨论】:

  • 您通常需要 2X 的最终内存来读取某些内容(来自 csv,尽管其他格式在内存要求较低方面更好)。仅供参考,这对于尝试同时做几乎所有事情是正确的。对它进行分块要好得多(内存使用量恒定)。
  • @altabq:这里的问题是我们没有足够的内存来构建一个保存所有数据的 DataFrame。上面的解决方案试图通过减少块(例如,通过聚合或仅提取所需信息)一次一个块来应对这种情况——从而节省内存。无论你做什么,都不要在循环内调用DF.append(chunk)。这将使用O(N^2) 复制操作。最好将聚合数据添加到列表,然后通过一次调用pd.DataFramepd.concat从列表构建DataFrame(取决于类型汇总数据)。
  • @altabq:在循环中调用DF.append(chunk) 需要O(N^2) 复制操作,其中N 是块的大小,因为每次调用DF.append 都会返回一个新的DataFrame。在循环外调用pd.DataFramepd.concat 一次 可以减少复制到O(N) 的数量。
  • @Pyderman:是的,chunksize 参数指的是每个块的行数。当然,最后一个块可能包含少于chunksize 行。
  • @Pyderman:是的;在循环之后调用pd.concat([list_of_dfs]) 一次 比在循环内多次调用pd.concatdf.append 快得多。当然,您需要大量内存才能将整个 6GB csv 保存为一个 DataFrame。
【解决方案2】:

我是这样处理的:

chunks=pd.read_table('aphro.csv',chunksize=1000000,sep=';',\
       names=['lat','long','rf','date','slno'],index_col='slno',\
       header=None,parse_dates=['date'])

df=pd.DataFrame()
%time df=pd.concat(chunk.groupby(['lat','long',chunk['date'].map(lambda x: x.year)])['rf'].agg(['sum']) for chunk in chunks)

【讨论】:

  • 你从read_csv切换到read_table有什么原因吗?
【解决方案3】:

您可以尝试 sframe,它与 pandas 具有相同的语法,但允许您操作大于 RAM 的文件。

【讨论】:

【解决方案4】:

函数 read_csv 和 read_table 几乎一样。但在程序中使用函数read_table时,必须指定分隔符“,”。

def get_from_action_data(fname, chunk_size=100000):
    reader = pd.read_csv(fname, header=0, iterator=True)
    chunks = []
    loop = True
    while loop:
        try:
            chunk = reader.get_chunk(chunk_size)[["user_id", "type"]]
            chunks.append(chunk)
        except StopIteration:
            loop = False
            print("Iteration is stopped")

    df_ac = pd.concat(chunks, ignore_index=True)

【讨论】:

  • 如果在这篇文章中说明您的问题是什么,将会有所帮助。比如“read_csv 和 read_table 有什么区别?”或“为什么读取表需要分隔符?”
  • 这取决于您的文件的外观。一些文件有常见的分隔符,例如“,”或“|”或“\t”,但您可能会看到其他带有分隔符的文件,例如 0x01、0x02(制作这个)等。所以 read_table 更适合不常见的分隔符,但 read_csv 可以做同样的工作。
【解决方案5】:

如果你使用 pandas 将大文件读入块然后逐行产生,这就是我所做的

import pandas as pd

def chunck_generator(filename, header=False,chunk_size = 10 ** 5):
   for chunk in pd.read_csv(filename,delimiter=',', iterator=True, chunksize=chunk_size, parse_dates=[1] ): 
        yield (chunk)

def _generator( filename, header=False,chunk_size = 10 ** 5):
    chunk = chunck_generator(filename, header=False,chunk_size = 10 ** 5)
    for row in chunk:
        yield row

if __name__ == "__main__":
filename = r'file.csv'
        generator = generator(filename=filename)
        while True:
           print(next(generator))

【讨论】:

    【解决方案6】:

    分块不应该总是解决这个问题的第一站。

    1. 文件是否因重复的非数字数据或不需要的列而变大?

      如果是这样,您有时可以看到 reading in columns as categories 节省大量内存并通过 pd.read_csv usecols 参数选择所需的列。

    2. 您的工作流程是否需要切片、操作、导出?

      如果是这样,您可以使用dask.dataframe 进行切片、执行计算和迭代导出。分块由 dask 静默执行,它还支持 pandas API 的子集。

    3. 如果一切都失败了,通过块逐行读取。

      分块via pandas 或通过csv library 作为最后的手段。

    【讨论】:

    • 看来chunks和“行数”的意思是一样的吧?
    • @Belter, ..是的。
    【解决方案7】:

    对于大数据,我建议您使用库“dask”
    例如:

    # Dataframes implement the Pandas API
    import dask.dataframe as dd
    df = dd.read_csv('s3://.../2018-*-*.csv')
    

    您可以从文档here 中了解更多信息。

    另一个不错的选择是使用 modin,因为所有功能都与 pandas 相同,但它利用了分布式数据帧库,例如 dask。

    在我的项目中,另一个高级库是 datatables

    # Datatable python library
    import datatable as dt
    df = dt.fread("s3://.../2018-*-*.csv")
    

    【讨论】:

    • 对熊猫的任何好处,可以多加一些指针
    • 我使用 Dask 的时间不长,但在我的用例中主要优点是 Dask 可以在多台机器上并行运行,它还可以将数据作为切片放入内存中。
    • 谢谢! dask 是 pandas 的替代品还是作为一个层在 pandas 之上工作
    • 欢迎,它可以作为 Numpy、Pandas 和 Scikit-Learn 的包装器。
    • 我尝试面对 Dask 的几个问题,但总是抛出一个错误。即使使用块它也会引发内存错误。见stackoverflow.com/questions/59865572/…
    【解决方案8】:

    除了上面的答案,对于那些想要处理 CSV 然后导出到 csv、parquet 或 SQL 的人来说,d6tstack 是另一个不错的选择。您可以加载多个文件,它处理数据架构更改(添加/删除的列)。已内置分块的核心支持。

    def apply(dfg):
        # do stuff
        return dfg
    
    c = d6tstack.combine_csv.CombinerCSV([bigfile.csv], apply_after_read=apply, sep=',', chunksize=1e6)
    
    # or
    c = d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), apply_after_read=apply, chunksize=1e6)
    
    # output to various formats, automatically chunked to reduce memory consumption
    c.to_csv_combine(filename='out.csv')
    c.to_parquet_combine(filename='out.pq')
    c.to_psql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # fast for postgres
    c.to_mysql_combine('mysql+mysqlconnector://usr:pwd@localhost/db', 'tablename') # fast for mysql
    c.to_sql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # slow but flexible
    

    【讨论】:

      【解决方案9】:

      您可以将数据作为块读取并将每个块保存为泡菜。

      import pandas as pd 
      import pickle
      
      in_path = "" #Path where the large file is
      out_path = "" #Path to save the pickle files to
      chunk_size = 400000 #size of chunks relies on your available memory
      separator = "~"
      
      reader = pd.read_csv(in_path,sep=separator,chunksize=chunk_size, 
                          low_memory=False)    
      
      
      for i, chunk in enumerate(reader):
          out_file = out_path + "/data_{}.pkl".format(i+1)
          with open(out_file, "wb") as f:
              pickle.dump(chunk,f,pickle.HIGHEST_PROTOCOL)
      

      在下一步中,您将读取泡菜并将每个泡菜附加到所需的数据帧。

      import glob
      pickle_path = "" #Same Path as out_path i.e. where the pickle files are
      
      data_p_files=[]
      for name in glob.glob(pickle_path + "/data_*.pkl"):
         data_p_files.append(name)
      
      
      df = pd.DataFrame([])
      for i in range(len(data_p_files)):
          df = df.append(pd.read_pickle(data_p_files[i]),ignore_index=True)
      

      【讨论】:

      • 如果您的最终df 完全适合内存(如暗示的那样)并且包含与您的输入相同数量的数据,那么您肯定根本不需要分块吗?
      • 在这种情况下,例如,如果您的文件非常宽(例如超过 100 列和很多字符串列),您将需要分块。这增加了将 df 保存在内存中所需的内存。即使是这样的 4GB 文件,在 64 GB RAM 的机器上也可能最终使用 20 到 30 GB 的 RAM。
      【解决方案10】:

      解决方案一:

      Using pandas with large data

      解决方案 2:

      TextFileReader = pd.read_csv(path, chunksize=1000)  # the number of rows per chunk
      
      dfList = []
      for df in TextFileReader:
          dfList.append(df)
      
      df = pd.concat(dfList,sort=False)
      

      【讨论】:

      • 这里我们再次将 6 GB 的文件完全加载到内存中,有什么选择,我们可以处理当前块然后读取下一个块
      • 只是不要做dfList.append,只是分别处理每个块(df
      【解决方案11】:

      如果有人仍在寻找类似的东西,我发现这个名为 modin 的新库可以提供帮助。它使用可以帮助读取的分布式计算。这是一个很好的article 将其功能与熊猫进行比较。它本质上使用与 pandas 相同的功能。

      import modin.pandas as pd
      pd.read_csv(CSV_FILE_NAME)
      

      【讨论】:

      【解决方案12】:

      下面是一个例子:

      chunkTemp = []
      queryTemp = []
      query = pd.DataFrame()
      
      for chunk in pd.read_csv(file, header=0, chunksize=<your_chunksize>, iterator=True, low_memory=False):
      
          #REPLACING BLANK SPACES AT COLUMNS' NAMES FOR SQL OPTIMIZATION
          chunk = chunk.rename(columns = {c: c.replace(' ', '') for c in chunk.columns})
      
          #YOU CAN EITHER: 
          #1)BUFFER THE CHUNKS IN ORDER TO LOAD YOUR WHOLE DATASET 
          chunkTemp.append(chunk)
      
          #2)DO YOUR PROCESSING OVER A CHUNK AND STORE THE RESULT OF IT
          query = chunk[chunk[<column_name>].str.startswith(<some_pattern>)]   
          #BUFFERING PROCESSED DATA
          queryTemp.append(query)
      
      #!  NEVER DO pd.concat OR pd.DataFrame() INSIDE A LOOP
      print("Database: CONCATENATING CHUNKS INTO A SINGLE DATAFRAME")
      chunk = pd.concat(chunkTemp)
      print("Database: LOADED")
      
      #CONCATENATING PROCESSED DATA
      query = pd.concat(queryTemp)
      print(query)
      

      【讨论】:

        【解决方案13】:

        我想根据已经提供的大多数潜在解决方案做出更全面的回答。我还想指出另一种可能有助于阅读过程的潜在帮助。

        选项 1:数据类型

        "dtypes" 是一个非常强大的参数,您可以使用它来减少read 方法的内存压力。请参阅thisthis 答案。 Pandas 默认尝试推断数据的 dtypes。

        参考数据结构,每存储一个数据,就会发生一次内存分配。基本参考以下值(下表说明了 C 编程语言的值):

        The maximum value of UNSIGNED CHAR = 255                                    
        The minimum value of SHORT INT = -32768                                     
        The maximum value of SHORT INT = 32767                                      
        The minimum value of INT = -2147483648                                      
        The maximum value of INT = 2147483647                                       
        The minimum value of CHAR = -128                                            
        The maximum value of CHAR = 127                                             
        The minimum value of LONG = -9223372036854775808                            
        The maximum value of LONG = 9223372036854775807
        

        参考this页面查看NumPy和C类型的匹配。

        假设您有一个由数字组成的整数数组。您可以在理论上和实际上分配,例如 16 位整数类型的数组,但是您将分配比实际需要存储该数组更多的内存。为了防止这种情况,您可以在read_csv 上设置dtype 选项。您不想将数组项存储为长整数,实际上您可以使用 8 位整数(np.int8np.uint8)来适应它们。

        观察以下 dtype 映射。

        来源:https://pbpython.com/pandas_dtypes.html

        您可以将dtype 参数作为pandas 方法的参数作为read 上的dict 传递,例如{column: type}。

        import numpy as np
        import pandas as pd
        
        df_dtype = {
                "column_1": int,
                "column_2": str,
                "column_3": np.int16,
                "column_4": np.uint8,
                ...
                "column_n": np.float32
        }
        
        df = pd.read_csv('path/to/file', dtype=df_dtype)
        

        选项 2:按块读取

        分块读取数据允许您访问内存中的部分数据,并且您可以对数据进行预处理并保留处理后的数据而不是原始数据。如果将此选项与第一个选项 dtypes 结合使用会更好。

        我想指出该过程的 pandas 食谱部分,您可以在其中找到它 here。注意那里的那两个部分;

        选项 3:Dask

        Dask 是一个框架,在Dask's website 中定义为:

        Dask 为分析提供高级并行性,为您喜爱的工具提供大规模性能

        它的诞生是为了覆盖熊猫无法到达的必要部分。 Dask 是一个强大的框架,通过以分布式方式处理数据,您可以访问更多数据。

        您可以使用 dask 对整个数据进行预处理,Dask 负责分块部分,因此与 pandas 不同的是,您只需定义处理步骤并让 Dask 完成工作。 Dask 在compute 和/或persist 明确推送之前不会应用计算(请参阅答案here 了解差异)。

        其他辅助(想法)

        • 为数据设计的 ETL 流。只保留原始数据中需要的内容。
          • 首先,使用 Dask 或 PySpark 等框架将 ETL 应用于整个数据,然后导出处理后的数据。
          • 然后看看处理后的数据是否可以作为一个整体放入内存中。
        • 考虑增加 RAM。
        • 考虑在云平台上使用该数据。

        【讨论】:

          【解决方案14】:

          在使用 chunksize 选项之前,如果您想确定要在 @unutbu 提到的分块 for 循环中编写的过程函数,您可以简单地使用 nrows 选项。

          small_df = pd.read_csv(filename, nrows=100)
          

          一旦您确定流程块已准备就绪,您就可以将其放入整个数据帧的分块 for 循环中。

          【讨论】:

            【解决方案15】:

            如果你有 csv 文件和 millions 的数据输入,并且你想加载完整的数据集,你应该使用 dask_cudf

            import dask_cudf as dc
            
            df = dc.read_csv("large_data.csv")
            

            【讨论】:

              猜你喜欢
              • 2023-02-20
              • 1970-01-01
              • 2019-02-22
              • 2021-09-02
              • 1970-01-01
              相关资源
              最近更新 更多