【问题标题】:Using dask to import many MAT files into one DataFrame使用 dask 将多个 MAT 文件导入一个 DataFrame
【发布时间】:2017-11-11 18:49:37
【问题描述】:

我有许多相同格式的 mat 文件,我希望将这些 mat 文件合并到一个带有 DatetimeIndex 的 DataFrame 中。目前,for 循环读取这些 mat 文件并使用 scipy.io.loadmat 将每个文件的内容加载到 pandas DataFrames 中,然后将每个 DataFrame 附加到 hdf5 表中。

每个 mat 文件都包含一个 4096x1024 的单精度矩阵,循环的初始每次迭代大约需要 1.5 秒。我已经用 806 个 mat 文件(12.5GB 大约需要 25 分钟)对此进行了测试,但我想将它应用到可能数百万的这些文件中,我有兴趣找到一个允许我导入新数据和查询的工作流和数据容器时间序列的子集。

是否可以使用 dask 或其他工具来加快此导入过程并创建单个可查询的时间序列?

for rot_file in rotation_files:
    print(rot_file)
    time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
    polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
    polar_image = polar_image.transpose()
    polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
    rot_id = time_stamps[0]
    rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
    rot_id_df.index = polar_image.index
    polar_image.join(rot_id_df)
    polar_image.columns = [str(col_name) for col_name in polar_image.columns]
    polar_image.to_hdf('rot_data.h5', 'polar_image', format='table', append=True, complib='blosc', complevel=9)

似乎应该可以使用 dask.delayed 进行导入,但我不确定如何将其写入单个 hdf 文件。

【问题讨论】:

  • 不要只是在外部链接到您的代码,请将其编辑到您的帖子中。
  • 是的,这是可能的。特别是如果您事先知道 .mat 的数量,您可以预先分配一个 hdf5 表,其中包含与 .mat 一样多的行(第一维度)。其他维度使用您的恒定形状(或者如果它们不同,则不使用)。
  • 问题:目的是查询.mat文件中的所有数据还是创建hdf数据集?
  • 目的是查询所有数据,以便能够使用基于日期的索引轻松提取子集。它不一定必须在 hdf 中。

标签: python pandas hdf5 pytables dask


【解决方案1】:

为了查询数据,您不需要写入 dask 明确支持的数据格式。您可以按如下方式定义数据框:

def mat_to_dataframe(rot_file):
    time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
    polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
    polar_image = polar_image.transpose()
    polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
    rot_id = time_stamps[0]
    rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
    rot_id_df.index = polar_image.index
    polar_image.join(rot_id_df)
    polar_image.columns = [str(col_name) for col_name in polar_image.columns]
    return polar_image

from dask import delayed
import dask.dataframe as dd

parts = [delayed(mat_to_dataframe)(fn) for fn in matfiles_list]
df = dd.from_delayed(parts)

这是一个“惰性”数据框:您可以对其应用类似 pandas 的计算,但这些只有在您调用 .compute() 时才会执行。如果 matload 进程持有 python GIL,那么我建议使用分布式调度程序(即使在单台机器上)client = dask.distributed.Client()

如果您可以先验知道每个部分的时间戳,那么您还可以将divisions= 提供给from_delayed,这意味着如果您的查询在索引上有过滤器,那么 dask 将知道哪些文件不需要正在加载。

如果加载过程很慢,并且您希望使用更快的格式进行查询,请尝试df.to_hdfdf.to_parquet。每个选项都有几个会影响您的表现的选项。

请注意,使用pd.to_datetime(time_stamps[0]) 可能更快地实现time_stamps[0].apply(convert_to_python_datetime).values

【讨论】:

  • 感谢 Martin,time_stamps 是 NTFS 时间戳,单位为 100 纳秒,to_datetime my conversion function is here 似乎无法正确解释。我试图在延迟的 DataFrame 上运行 to_hdf,但我认为它复制了一些记录,正在写入的 hdf 文件比原始文件有更多的记录。
  • 这听起来很奇怪——如果你能产生一个最小的可重现的例子,它可能会很有趣。
  • 这是我的错误,我不小心在 jupyter 笔记本单元格中使用了 %timeit 魔法而不是 %time 并调用了 to_hdf,因此它多次重复该过程。 dask 中是否有任何方法可以让您了解像 to_hdf 这样需要一些时间来执行的调用的进度或运行时间?
  • 您需要 dask 分析器或进度条,例如,this 用于线程,this 用于分布式调度器。
猜你喜欢
  • 2016-08-05
  • 2023-02-01
  • 1970-01-01
  • 2014-01-21
  • 2014-01-21
  • 1970-01-01
相关资源
最近更新 更多