【问题标题】:How do I read a large csv file with pandas?如何使用 Pandas 读取大型 csv 文件?
【发布时间】:2023-02-20 11:09:41
【问题描述】:

我正在尝试在 pandas 中读取一个大的 csv 文件(大约 6 GB),但出现内存错误:

MemoryError                               Traceback (most recent call last)
<ipython-input-58-67a72687871b> in <module>()
----> 1 data=pd.read_csv('aphro.csv',sep=';')

...

MemoryError: 

有什么帮助吗?

【问题讨论】:

标签: python pandas csv memory chunks


【解决方案1】:

该错误表明机器没有足够的内存来读取整个 CSV 一次转换为 DataFrame。假设您不需要整个数据集 一次全部存储,避免该问题的一种方法是process the CSV in chunks(通过指定chunksize参数):

chunksize = 10 ** 6
for chunk in pd.read_csv(filename, chunksize=chunksize):
    process(chunk)

chunksize 参数指定每个块的行数。 (当然,最后一个块可能包含少于 chunksize 行。)


熊猫 >= 1.2

read_csvchunksize 返回一个上下文管理器,可以像这样使用:

chunksize = 10 ** 6
with pd.read_csv(filename, chunksize=chunksize) as reader:
    for chunk in reader:
        process(chunk)

GH38225

【讨论】:

  • 您通常需要 2 倍的最终内存才能读取某些内容(来自 csv,尽管其他格式在内存要求较低方面更好)。仅供参考,这对于尝试同时做几乎所有事情都是正确的。最好将它分块(它具有恒定的内存使用量)。
  • @altabq:这里的问题是我们没有足够的内存来构建一个包含所有数据的数据帧。上面的解决方案试图通过一次减少一个块(例如,通过聚合或仅提取所需信息)一个块来应对这种情况——从而节省内存。无论你做什么,都不要在循环内调用DF.append(chunk)。这将使用O(N^2) 复制操作。最好附加聚合数据到一个列表,然后从列表中构建 DataFrame一个电话pd.DataFramepd.concat(取决于聚合数据的类型)。
  • @altabq:在循环中调用DF.append(chunk)需要O(N^2)复制操作,其中N是块的大小,因为每次调用DF.append都会返回一个新的DataFrame。致电pd.DataFramepd.concat一次循环外减少复制到O(N)的数量。
  • @Pyderman:是的,chunksize 参数是指每个块的行数。当然,最后一个块可能包含少于 chunksize 行。
  • @Pyderman:是的;打电话pd.concat([list_of_dfs])一次在循环之后比在循环中多次调用 pd.concatdf.append 快得多。当然,您需要大量内存才能将整个 6GB csv 作为一个 DataFrame 保存。
【解决方案2】:

分块不应该总是这个问题的第一站。

  1. 由于重复的非数字数据或不需要的列,文件是否很大?

    如果是这样,您有时可以通过 reading in columns as categories 和通过 pd.read_csv usecols 参数选择所需的列来节省大量内存。

  2. 您的工作流程是否需要切片、操作、导出?

    如果是这样,您可以使用dask.dataframe 进行切片、执行计算并迭代导出。分块由 dask 静默执行,它也支持 pandas API 的一个子集。

  3. 如果一切都失败了,通过块逐行读取。

    Chunk via pandas 或通过 csv library 作为最后的手段。

【讨论】:

  • 看起来chunks和“行数”的意思是一样的吧?
  • @Belter,..是的。
【解决方案3】:

对于大数据,我建议您使用库“dask”
例如:

# Dataframes implement the Pandas API
import dask.dataframe as dd
df = dd.read_csv('s3://.../2018-*-*.csv')

您可以从文档here 中阅读更多内容。

另一个很好的选择是使用modin,因为所有功能都与 pandas 相同,但它利用分布式数据框架库,如 dask。

在我的项目中,另一个高级库是datatables

# Datatable python library
import datatable as dt
df = dt.fread("s3://.../2018-*-*.csv")

【讨论】:

  • 比大熊猫有任何好处,希望能增加一些建议
  • 我使用 Dask 的时间并不长,但在我的用例中的主要优势是 Dask 可以在多台机器上并行运行,它还可以将数据作为切片放入内存中。
  • 谢谢! dask 是 pandas 的替代品还是在 pandas 之上作为一层工作
  • 欢迎使用,它作为 Numpy、Pandas 和 Scikit-Learn 的包装器。
  • 我曾尝试用 Dask 解决几个问题,但总是会为所有问题抛出错误。即使有块,它也会抛出内存错误。见stackoverflow.com/questions/59865572/…
【解决方案4】:

我是这样进行的:

chunks=pd.read_table('aphro.csv',chunksize=1000000,sep=';',
       names=['lat','long','rf','date','slno'],index_col='slno',
       header=None,parse_dates=['date'])

df=pd.DataFrame()
%time df=pd.concat(chunk.groupby(['lat','long',chunk['date'].map(lambda x: x.year)])['rf'].agg(['sum']) for chunk in chunks)

【讨论】:

  • 您从read_csv 切换到read_table 有什么原因吗?
【解决方案5】:

您可以将数据作为块读入并将每个块保存为 pickle。

import pandas as pd 
import pickle

in_path = "" #Path where the large file is
out_path = "" #Path to save the pickle files to
chunk_size = 400000 #size of chunks relies on your available memory
separator = "~"

reader = pd.read_csv(in_path,sep=separator,chunksize=chunk_size, 
                    low_memory=False)    


for i, chunk in enumerate(reader):
    out_file = out_path + "/data_{}.pkl".format(i+1)
    with open(out_file, "wb") as f:
        pickle.dump(chunk,f,pickle.HIGHEST_PROTOCOL)

在下一步中,您读入 pickle 并将每个 pickle 附加到所需的数据框。

import glob
pickle_path = "" #Same Path as out_path i.e. where the pickle files are

data_p_files=[]
for name in glob.glob(pickle_path + "/data_*.pkl"):
   data_p_files.append(name)


df = pd.DataFrame([])
for i in range(len(data_p_files)):
    df = df.append(pd.read_pickle(data_p_files[i]),ignore_index=True)

【讨论】:

  • 如果您的最终 df 完全适合内存(如暗示的那样)并且包含与您的输入相同数量的数据,那么您肯定根本不需要分块吗?
  • 在这种情况下,如果您的文件非常宽(例如超过 100 列且包含大量字符串列),则您需要分块。这增加了在内存中保存 df 所需的内存。即使是像这样的 4GB 文件,在 64GB RAM 的盒子上最终也会使用 20 到 30GB 的 RAM。
【解决方案6】:

我想根据已经提供的大部分潜在解决方案做出更全面的回答。我还想指出一种可能有助于阅读过程的潜在帮助。

选项 1:数据类型

“dtypes”是一个非常强大的参数,您可以使用它来减少read 方法的内存压力。请参阅thisthis 回答。默认情况下,Pandas 会尝试推断数据的数据类型。

参考数据结构,每存储一个数据,就会发生一次内存分配。在基本级别上,请参考以下值(下表说明了 C 编程语言的值):

The maximum value of UNSIGNED CHAR = 255                                    
The minimum value of SHORT INT = -32768                                     
The maximum value of SHORT INT = 32767                                      
The minimum value of INT = -2147483648                                      
The maximum value of INT = 2147483647                                       
The minimum value of CHAR = -128                                            
The maximum value of CHAR = 127                                             
The minimum value of LONG = -9223372036854775808                            
The maximum value of LONG = 9223372036854775807

参考this页面查看NumPy和C类型的匹配。

假设您有一个整数数组数字.您可以在理论上和实践中分配,比如 16 位整数类型的数组,但是您分配的内存将超过存储该数组的实际需要。为防止这种情况,您可以在read_csv 上设置dtype 选项。您不想将数组项存储为长整数,而实际上您可以使用 8 位整数(np.int8np.uint8)来存储它们。

观察以下 dtype 映射。

来源:https://pbpython.com/pandas_dtypes.html

您可以将 dtype 参数作为 pandas 方法的参数作为参数传递给 read 上的字典,例如 {column: type}。

import numpy as np
import pandas as pd

df_dtype = {
        "column_1": int,
        "column_2": str,
        "column_3": np.int16,
        "column_4": np.uint8,
        ...
        "column_n": np.float32
}

df = pd.read_csv('path/to/file', dtype=df_dtype)

选项 2:按块读取

分块读取数据允许您访问内存中的部分数据,并且您可以对数据应用预处理并保留处理后的数据而不是原始数据。如果将此选项与第一个选项结合使用会更好,数据类型.

我想指出该过程的 pandas 食谱部分,您可以在其中找到它here。注意那里的那两个部分;

选项 3:达斯克

Dask 是一个框架,在Dask's website 中定义为:

Dask 为分析提供高级并行性,为您喜欢的工具实现大规模性能

它的诞生是为了覆盖 pandas 无法到达的必要部分。 Dask 是一个强大的框架,它允许您通过以分布式方式处理数据来访问更多数据。

您可以使用 dask 对整个数据进行预处理,Dask 负责分块部分,因此与 pandas 不同,您只需定义处理步骤并让 Dask 完成工作。 Dask 在被compute 和/或persist 明确推送之前不会应用计算(有关差异,请参阅答案here)。

其他帮助(想法)

  • 为数据设计的 ETL 流程。仅保留原始数据中需要的内容。
    • 首先,使用 Dask 或 PySpark 等框架将 ETL 应用于整个数据,并导出处理后的数据。
    • 然后看处理后的数据是否可以整体放入内存。
  • 考虑增加 RAM。
  • 考虑在云平台上使用该数据。

【讨论】:

    【解决方案7】:

    在使用 chunksize 选项之前,如果您想确定要在 @unutbu 提到的分块 for 循环中编写的过程函数,您可以简单地使用 nrows 选项。

    small_df = pd.read_csv(filename, nrows=100)
    

    一旦确定进程块已准备就绪,就可以将其放入整个数据帧的分块 for 循环中。

    【讨论】:

      【解决方案8】:

      函数read_csv和read_table几乎是一样的。但是在程序中使用read_table 函数时必须指定分隔符“,”。

      def get_from_action_data(fname, chunk_size=100000):
          reader = pd.read_csv(fname, header=0, iterator=True)
          chunks = []
          loop = True
          while loop:
              try:
                  chunk = reader.get_chunk(chunk_size)[["user_id", "type"]]
                  chunks.append(chunk)
              except StopIteration:
                  loop = False
                  print("Iteration is stopped")
      
          df_ac = pd.concat(chunks, ignore_index=True)
      

      【讨论】:

      • 如果在这篇文章中说明您的问题是什么,将会有所帮助。比如“read_csv 和 read_table 有什么区别?”或“为什么读表需要分隔符?”
      • 这取决于您的文件的外观。某些文件具有常见的分隔符,例如“,”或“|”或 " ",但您可能会看到其他带有分隔符的文件,例如 0x01、0x02(弥补这一点)等。因此 read_table 更适合不常见的分隔符,但 read_csv 可以同样出色地完成同样的工作。
      【解决方案9】:

      解决方案 1:

      Using pandas with large data

      解决方案 2:

      TextFileReader = pd.read_csv(path, chunksize=1000)  # the number of rows per chunk
      
      dfList = []
      for df in TextFileReader:
          dfList.append(df)
      
      df = pd.concat(dfList,sort=False)
      

      【讨论】:

      • 在这里,我们再次将 6 GB 文件完全加载到内存中,是否有任何选项,我们可以处理当前块,然后读取下一个块
      • 只是不要做dfList.append,只是分别处理每个块(df
      【解决方案10】:

      下面是一个例子:

      chunkTemp = []
      queryTemp = []
      query = pd.DataFrame()
      
      for chunk in pd.read_csv(file, header=0, chunksize=<your_chunksize>, iterator=True, low_memory=False):
      
          #REPLACING BLANK SPACES AT COLUMNS' NAMES FOR SQL OPTIMIZATION
          chunk = chunk.rename(columns = {c: c.replace(' ', '') for c in chunk.columns})
      
          #YOU CAN EITHER: 
          #1)BUFFER THE CHUNKS IN ORDER TO LOAD YOUR WHOLE DATASET 
          chunkTemp.append(chunk)
      
          #2)DO YOUR PROCESSING OVER A CHUNK AND STORE THE RESULT OF IT
          query = chunk[chunk[<column_name>].str.startswith(<some_pattern>)]   
          #BUFFERING PROCESSED DATA
          queryTemp.append(query)
      
      #!  NEVER DO pd.concat OR pd.DataFrame() INSIDE A LOOP
      print("Database: CONCATENATING CHUNKS INTO A SINGLE DATAFRAME")
      chunk = pd.concat(chunkTemp)
      print("Database: LOADED")
      
      #CONCATENATING PROCESSED DATA
      query = pd.concat(queryTemp)
      print(query)
      

      【讨论】:

        【解决方案11】:

        您可以尝试 sframe,它具有与 pandas 相同的语法,但允许您操作比 RAM 大的文件。

        【讨论】:

        【解决方案12】:

        如果你使用 pandas 将大文件读入块然后逐行产生,这就是我所做的

        import pandas as pd
        
        def chunck_generator(filename, header=False,chunk_size = 10 ** 5):
           for chunk in pd.read_csv(filename,delimiter=',', iterator=True, chunksize=chunk_size, parse_dates=[1] ): 
                yield (chunk)
        
        def _generator( filename, header=False,chunk_size = 10 ** 5):
            chunk = chunck_generator(filename, header=False,chunk_size = 10 ** 5)
            for row in chunk:
                yield row
        
        if __name__ == "__main__":
        filename = r'file.csv'
                generator = generator(filename=filename)
                while True:
                   print(next(generator))
        

        【讨论】:

          【解决方案13】:

          如果有人仍在寻找类似的东西,我发现这个名为 modin 的新库可以提供帮助。它使用可以帮助读取的分布式计算。这是一个很好的 article 将其功能与 pandas 进行比较。它本质上使用与 pandas 相同的功能。

          import modin.pandas as pd
          pd.read_csv(CSV_FILE_NAME)
          

          【讨论】:

          【解决方案14】:

          如果你有 csv 文件和 millions 的数据条目,并且你想加载完整的数据集,你应该使用 dask_cudf

          import dask_cudf as dc
          
          df = dc.read_csv("large_data.csv")
          

          【讨论】:

            【解决方案15】:

            除了上面的答案,对于那些想处理 CSV 然后导出到 csv、parquet 或 SQL 的人来说,d6tstack 是另一个不错的选择。您可以加载多个文件,它处理数据架构更改(添加/删除列)。分块的核心支持已经内置。

            def apply(dfg):
                # do stuff
                return dfg
            
            c = d6tstack.combine_csv.CombinerCSV([bigfile.csv], apply_after_read=apply, sep=',', chunksize=1e6)
            
            # or
            c = d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), apply_after_read=apply, chunksize=1e6)
            
            # output to various formats, automatically chunked to reduce memory consumption
            c.to_csv_combine(filename='out.csv')
            c.to_parquet_combine(filename='out.pq')
            c.to_psql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # fast for postgres
            c.to_mysql_combine('mysql+mysqlconnector://usr:pwd@localhost/db', 'tablename') # fast for mysql
            c.to_sql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # slow but flexible
            

            【讨论】:

              猜你喜欢
              • 2014-11-15
              • 1970-01-01
              • 2019-02-22
              • 2021-09-02
              • 1970-01-01
              相关资源
              最近更新 更多