【问题标题】:Killed/MemoryError when creating a large dask.dataframe from delayed collection从延迟收集创建大型 dask.dataframe 时出现 Killed/MemoryError
【发布时间】:2018-06-22 19:27:55
【问题描述】:

我正在尝试从一堆大型 CSV 文件(当前 12 个文件,8-10 百万行和每个 50 列)创建一个dask.dataframe。其中一些可能会一起放入我的系统内存中,但它们肯定不会同时出现,因此使用 dask 而不是常规的 pandas。

由于读取每个 csv 文件涉及一些额外的工作(从文件路径中添加数据列),我尝试从延迟对象列表中创建 dask.dataframe,类似于to this example

这是我的代码:

import dask.dataframe as dd
from dask.delayed import delayed
import os
import pandas as pd

def read_file_to_dataframe(file_path):
    df = pd.read_csv(file_path)
    df['some_extra_column'] = 'some_extra_value'
    return df

if __name__ == '__main__':
    path = '/path/to/my/files'
    delayed_collection = list()
    for rootdir, subdirs, files in os.walk(path):
        for filename in files:
            if filename.endswith('.csv'):
                file_path = os.path.join(rootdir, filename)
                delayed_reader = delayed(read_file_to_dataframe)(file_path)
                delayed_collection.append(delayed_reader)

    df = dd.from_delayed(delayed_collection)
    print(df.compute())

启动此脚本(Python 3.4,dask 0.12.0)时,它会运行几分钟,而我的系统内存会不断填满。当它被完全使用时,一切都会开始滞后并运行几分钟,然后它会以killedMemoryError 崩溃。

我认为 dask.dataframe 的全部意义在于能够对跨越磁盘上多个文件的大于内存的数据帧进行操作,那么我在这里做错了什么?

编辑:据我所知,用df = dd.read_csv(path + '/*.csv') 读取文件似乎工作正常。但是,这不允许我使用文件路径中的其他数据来更改每个数据帧。

编辑#2: 按照 MRocklin 的回答,我尝试使用 dask 的 read_bytes() method 以及使用 single-threaded scheduler 以及将两者结合使用来读取我的数据。 尽管如此,即使在具有 8GB 内存的笔记本电脑上以单线程模式读取 100MB 的块时,我的进程迟早会被杀死。在一堆类似形状的小文件(每个大约 1MB)上运行下面所述的代码可以正常工作。 有什么想法我在这里做错了吗?

import dask
from dask.bytes import read_bytes
import dask.dataframe as dd
from dask.delayed import delayed
from io import BytesIO
import pandas as pd

def create_df_from_bytesio(bytesio):
    df = pd.read_csv(bytesio)
    return df

def create_bytesio_from_bytes(block):
    bytesio = BytesIO(block)
    return bytesio


path = '/path/to/my/files/*.csv'

sample, blocks = read_bytes(path, delimiter=b'\n', blocksize=1024*1024*100)
delayed_collection = list()
for datafile in blocks:
    for block in datafile:
        bytesio = delayed(create_bytesio_from_bytes)(block)
        df = delayed(create_df_from_bytesio)(bytesio)
        delayed_collection.append(df)

dask_df = dd.from_delayed(delayed_collection)
print(dask_df.compute(get=dask.async.get_sync))

【问题讨论】:

    标签: python dataframe dask


    【解决方案1】:

    如果您的每个文件都很大,那么在 Dask 有机会变聪明之前,对 read_file_to_dataframe 的几个并发调用可能会导致内存泛滥。

    Dask 尝试通过按顺序运行函数来在低内存中操作,以便它可以快速删除中间结果。但是,如果仅仅几个函数的结果就可以填满内存,那么 Dask 可能永远没有机会删除东西。例如,如果您的每个函数产生一个 2GB 的数据帧,并且如果您有 8 个线程同时运行,那么您的函数可能会在 Dask 的调度策略启动之前产生 16GB 的数据。

    一些选项

    使用 dask.bytes.read_bytes

    read_csv 工作的原因是它将大型 CSV 文件分块为许多 ~100MB 字节块(请参阅 blocksize= 关键字参数)。你也可以这样做,虽然这很棘手,因为你需要总是在端线中断。

    dask.bytes.read_bytes 函数可以在这里为您提供帮助。它可以将单个路径转换为delayed 对象列表,每个对象对应于该文件的一个字节范围,该文件在分隔符上干净地开始和停止。然后,您可以将这些字节放入 io.BytesIO(标准库)并在其上调用 pandas.read_csv。请注意,您还必须处理标题等。该函数的文档字符串很广泛,应该提供更多帮助。

    使用单线程

    在上面的示例中,如果我们没有来自并行性的 8 倍乘数,一切都会好起来的。我怀疑如果你一次只运行一个函数,那么事情可能会在没有达到你的内存限制的情况下进行管道传输。您可以使用以下行将 dask 设置为仅使用单个线程

    dask.set_options(get=dask.async.get_sync)
    

    注意For Dask 版本 >= 0.15,您需要改用 dask.local.get_sync

    确保结果适合内存(响应编辑 2)

    如果你创建一个 dask.dataframe 然后立即计算它

    ddf = dd.read_csv(...)
    df = ddf.compute()
    

    您正在将所有数据加载到 Pandas 数据帧中,这最终会耗尽内存。相反,最好在 Dask 数据帧上进行操作,并且只计算小的结果。

    # result = df.compute()  # large result fills memory
    result = df.groupby(...).column.mean().compute()  # small result
    

    转换成不同的格式

    CSV 是一种普遍且实用的格式,但也存在一些缺陷。您可以考虑使用 HDF5 或 Parquet 之类的数据格式。

    【讨论】:

    • 谢谢,直到现在我才重新讨论这个话题。这看起来确实是一些好主意。但是,使用您的前两个建议仍然无效,请参阅我的编辑#2。我将首先尝试转换为不同的格式。
    • dask_df.compute(get=dask.async.get_sync) 这一行将所有数据评估为内存中的一个 pandas 数据帧。相反,您应该对 dask 数据帧进行操作,并且只计算较小的结果,例如 dask_df.groupby(...).column.mean().compute()。结果预计适合内存
    • 我想我不明白为什么调度程序知道提交的对 read_file_to_dataframe 调用的集合,不尝试将所需的未来内存绑定为时间的函数,并相应地进行调度。跨度>
    • 我认为 Dask 的卖点之一是它可以处理大于内存数据的核外数据。为什么我不断收到 MemoryError,为什么 Stack Overflow 上有这么多关于此的帖子?是否有单行修复?
    猜你喜欢
    • 2023-03-11
    • 1970-01-01
    • 1970-01-01
    • 2019-12-16
    • 2011-10-13
    • 1970-01-01
    • 1970-01-01
    • 2013-05-24
    • 1970-01-01
    相关资源
    最近更新 更多