【问题标题】:Improve performance of running large files提高运行大文件的性能
【发布时间】:2020-09-03 19:14:05
【问题描述】:

我知道关于这个主题有几个问题,但我似乎无法有效地进行。我有大型输入数据集(2-3 GB) 在我的机器上运行,其中包含8GB 的内存。我正在使用安装了pandas 0.24.0spyder 版本。输入文件目前需要大约一个小时来生成大约10MB 的输出文件。

我试图通过使用下面的代码对输入文件进行分块来优化该过程。本质上,我将chunk 输入文件分成更小的段,通过一些代码运行它并导出更小的输出。然后我删除分块信息以释放内存。但是内存仍然在整个操作过程中建立,并最终花费了相似的时间。我不确定我做错了什么:

文件的内存使用详情为:

RangeIndex: 5471998 entries, 0 to 5471997
Data columns (total 17 columns):
col1                     object
col2                     object
col3                     object
....
dtypes: object(17)
memory usage: 5.6 GB

我通过将cols_to_keep 传递给use_cols 来对df 进行子集化。但是每个文件的标题都不同,所以我使用位置索引来获取相关的标题。

# Because the column headers change from file to file I use location indexing to read the col headers I need
df_cols = pd.read_csv('file.csv')
# Read cols to be used
df_cols = df_cols.iloc[:,np.r_[1,3,8,12,23]]
# Export col headers
cols_to_keep = df_cols.columns

PATH = '/Volume/Folder/Event/file.csv'
chunksize = 10000

df_list = [] # list to hold the batch dataframe

for df_chunk in pd.read_csv(PATH, chunksize = chunksize, usecols = cols_to_keep):
    # Measure time taken to execute each batch
    print("summation download chunk time: " , time.clock()-t)

    # Execute func1
    df1 = func1(df_chunk)

    # Execute func2
    df2 = func1(df1)

    # Append the chunk to list and merge all
    df_list.append(df2) 

# Merge all dataframes into one dataframe
df = pd.concat(df_list)

# Delete the dataframe list to release memory
del df_list
del df_chunk

我尝试过使用 dask,但使用简单的 pandas 方法会出现各种错误。

import dask.dataframe as ddf

df_cols = pd.read_csv('file.csv')
df_cols = df_cols.iloc[:,np.r_[1:3,8,12,23:25,32,42,44,46,65:67,-5:0,]]
cols_to_keep = df_cols.columns

PATH = '/Volume/Folder/Event/file.csv'
blocksize = 10000


df_list = [] # list to hold the batch dataframe


df_chunk = ddf.read_csv(PATH, blocksize = blocksize, usecols = cols_to_keep, parse_dates = ['Time']):
    print("summation download chunk time: " , time.clock()-t)

    # Execute func1
    df1 = func1(df_chunk)

    # Execute func2
    df2 = func1(df1)

    # Append the chunk to list and merge all
    df_list.append(df2)


    delayed_results = [delayed(df2) for df_chunk in df_list]

引发错误的行:

df1 = func1(df_chunk)

  name_unq = df['name'].dropna().unique().tolist()

AttributeError: 'Series' object has no attribute 'tolist'

我已经通过了许多函数,但它只是继续抛出错误。

【问题讨论】:

  • 你不想使用数据库? sqlite 也许?
  • Del 不会立即释放内存。如果内存未使用,它会等待垃圾收集器启动以释放内存。您可以使用 gc.collect() 强制 gc。
  • 如果行顺序不重要,为什么不让每个 for 迭代在一个线程中运行?
  • 我会看看gc.collect() 函数。以前没听说过。您能否再解释一下第二条评论。我怎么能在一个线程中运行它?
  • 内存分析器是帮助隔离意外使用源的工具。

标签: python pandas memory chunked-encoding


【解决方案1】:

要处理您的文件,请使用 dask,它仅用于处理 带有大(实际上,非常大)文件。

它还有 read_csv 功能,带有额外的 blocksize 参数,以 定义单个块的大小。

read_csv 的结果在概念上是一个单一的 (dask) DataFrame,它 由一系列partitions组成,实际上是pandasonic DataFrames。

然后你可以使用 map_partitions 函数来应用你的函数 到每个分区。 因为这个函数(传递给 map_partitions)在单个 分区(pandasonic DataFrame),您可以使用任何代码, 之前在 Pandas 环境中测试过。

这种解决方案的优点是可以处理单个分区 在可用内核之间进行划分,而 Pandas 只使用一个内核。

所以你的循环应该被修改为:

  • read_csvdask 版本),
  • map_partitions 从每个分区生成部分结果,
  • concat - 连接这些部分结果(现在的结果 仍然是一个dask DataFrame,
  • 将其转换为 pandasonic DataFrame,
  • Pandas 环境中进一步使用它。

要了解更多详情,请阅读dask

【讨论】:

  • 在使用daskread_csv 时遇到各种错误。就好像它根本没有熊猫功能。不知道我做错了什么。
  • 看看stackoverflow.com/questions/59082723/…。前段时间我回答了一个关于原pandas的问题,但是我提出了一个仅基于dask的解决方案,PO很高兴。问题的类型不同(来自多个输入文件的数据聚合),但至少它会告诉你如何处理dask DataFrames,read_csv(在 dask 版本)和 map_partition.
  • 是的,它与多个 df 有点不同。我不明白如何使用 map_partitions。不过感谢您的帮助。
猜你喜欢
  • 1970-01-01
  • 2012-08-09
  • 1970-01-01
  • 1970-01-01
  • 2020-10-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多