【发布时间】:2018-06-22 19:27:55
【问题描述】:
我正在尝试从一堆大型 CSV 文件(当前 12 个文件,8-10 百万行和每个 50 列)创建一个dask.dataframe。其中一些可能会一起放入我的系统内存中,但它们肯定不会同时出现,因此使用 dask 而不是常规的 pandas。
由于读取每个 csv 文件涉及一些额外的工作(从文件路径中添加数据列),我尝试从延迟对象列表中创建 dask.dataframe,类似于to this example。
这是我的代码:
import dask.dataframe as dd
from dask.delayed import delayed
import os
import pandas as pd
def read_file_to_dataframe(file_path):
df = pd.read_csv(file_path)
df['some_extra_column'] = 'some_extra_value'
return df
if __name__ == '__main__':
path = '/path/to/my/files'
delayed_collection = list()
for rootdir, subdirs, files in os.walk(path):
for filename in files:
if filename.endswith('.csv'):
file_path = os.path.join(rootdir, filename)
delayed_reader = delayed(read_file_to_dataframe)(file_path)
delayed_collection.append(delayed_reader)
df = dd.from_delayed(delayed_collection)
print(df.compute())
启动此脚本(Python 3.4,dask 0.12.0)时,它会运行几分钟,而我的系统内存会不断填满。当它被完全使用时,一切都会开始滞后并运行几分钟,然后它会以killed 或MemoryError 崩溃。
我认为 dask.dataframe 的全部意义在于能够对跨越磁盘上多个文件的大于内存的数据帧进行操作,那么我在这里做错了什么?
编辑:据我所知,用df = dd.read_csv(path + '/*.csv') 读取文件似乎工作正常。但是,这不允许我使用文件路径中的其他数据来更改每个数据帧。
编辑#2: 按照 MRocklin 的回答,我尝试使用 dask 的 read_bytes() method 以及使用 single-threaded scheduler 以及将两者结合使用来读取我的数据。 尽管如此,即使在具有 8GB 内存的笔记本电脑上以单线程模式读取 100MB 的块时,我的进程迟早会被杀死。在一堆类似形状的小文件(每个大约 1MB)上运行下面所述的代码可以正常工作。 有什么想法我在这里做错了吗?
import dask
from dask.bytes import read_bytes
import dask.dataframe as dd
from dask.delayed import delayed
from io import BytesIO
import pandas as pd
def create_df_from_bytesio(bytesio):
df = pd.read_csv(bytesio)
return df
def create_bytesio_from_bytes(block):
bytesio = BytesIO(block)
return bytesio
path = '/path/to/my/files/*.csv'
sample, blocks = read_bytes(path, delimiter=b'\n', blocksize=1024*1024*100)
delayed_collection = list()
for datafile in blocks:
for block in datafile:
bytesio = delayed(create_bytesio_from_bytes)(block)
df = delayed(create_df_from_bytesio)(bytesio)
delayed_collection.append(df)
dask_df = dd.from_delayed(delayed_collection)
print(dask_df.compute(get=dask.async.get_sync))
【问题讨论】: