【问题标题】:Loading large datasets with dask使用 dask 加载大型数据集
【发布时间】:2018-11-20 23:14:18
【问题描述】:

我处于具有集群、紧密耦合互连和支持 Lustre 文件系统的 HPC 环境中。我们一直在探索如何利用 Dask 不仅提供计算,还充当分布式缓存来加速我们的工作流程。我们专有的数据格式是 n 维和规则的,我们编写了一个惰性读取器来传递给 from_array/from_delayed 方法。

我们在跨 Dask 集群加载和持久化大于内存的数据集时遇到了一些问题。

以 hdf5 为例:

# Dask scheduler has been started and connected to 8 workers
# spread out on 8 machines, each with --memory-limit=150e9.
# File locking for reading hdf5 is also turned off
from dask.distributed import Client
c = Client({ip_of_scheduler})
import dask.array as da
import h5py
hf = h5py.File('path_to_600GB_hdf5_file', 'r')
ds = hf[hf.keys()[0]]
x = da.from_array(ds, chunks=(100, -1, -1))
x = c.persist(x) # takes 40 minutes, far below network and filesystem capabilities
print x[300000,:,:].compute() # works as expected

我们还从我们自己的一些文件文件格式中加载了数据集(使用 slicing、dask.delayed 和 from_delayed),并且随着文件大小的增加,性能也出现了类似的下降。

我的问题:使用 Dask 作为分布式缓存是否存在固有的瓶颈?是否会强制所有数据通过调度程序汇集?工人是否能够利用 Lustre,或者功能和/或 I/O 是否以某种方式序列化?如果是这种情况,在海量数据集上不调用persist,让Dask在需要时处理数据和计算会更有效吗?

【问题讨论】:

    标签: hdf5 dask


    【解决方案1】:
    • 使用 Dask 作为分布式缓存是否存在固有的瓶颈?

      每个系统都有瓶颈,但听起来你还没有接近我期望 Dask 遇到的瓶颈。 我怀疑你遇到了其他问题。

    • 是否会强制所有数据通过调度程序汇集?

      不,工作人员可以执行自行加载数据的功能。然后,这些数据将保留在工作人员身上。

    • worker 是否能够利用 Lustre,或者函数和/或 I/O 是否以某种方式序列化?

      Worker 只是 Python 进程,因此如果在您的集群上运行的 Python 进程可以利用 Lustre(几乎可以肯定是这种情况),那么是的,Dask Workers 可以利用 Lustre。

    • 如果是这样的话,在海量数据集上不调用persist,让Dask在需要的时候处理数据和计算会更有效吗?

      这当然很常见。这里的权衡是在 NFS 的分布式带宽和分布式内存的可用性之间。

    在你的位置上,我会使用 Dask 的诊断来找出占用这么多时间的原因。您可能需要仔细阅读有关 understanding performance 的文档,尤其是有关 dashboard 的部分。该部分有一个可能特别有用的视频。我会问两个问题:

    1. 工作人员是否一直在运行任务? (状态页面,任务流图)
    2. 在这些任务中,什么占用了时间? (个人资料页面)

    【讨论】:

    • 我在 HPC Slurm 集群上使用 Dask,我正在尝试 client.persist(df) 一个跨许多工作人员的巨大 memapped numpy 数组,但不幸的是它似乎是通过调度程序汇集的,它崩溃了一段时间后由于内存不足错误
    猜你喜欢
    • 2020-06-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-28
    • 2022-08-13
    • 2019-12-19
    • 1970-01-01
    相关资源
    最近更新 更多