【发布时间】:2018-11-20 23:14:18
【问题描述】:
我处于具有集群、紧密耦合互连和支持 Lustre 文件系统的 HPC 环境中。我们一直在探索如何利用 Dask 不仅提供计算,还充当分布式缓存来加速我们的工作流程。我们专有的数据格式是 n 维和规则的,我们编写了一个惰性读取器来传递给 from_array/from_delayed 方法。
我们在跨 Dask 集群加载和持久化大于内存的数据集时遇到了一些问题。
以 hdf5 为例:
# Dask scheduler has been started and connected to 8 workers
# spread out on 8 machines, each with --memory-limit=150e9.
# File locking for reading hdf5 is also turned off
from dask.distributed import Client
c = Client({ip_of_scheduler})
import dask.array as da
import h5py
hf = h5py.File('path_to_600GB_hdf5_file', 'r')
ds = hf[hf.keys()[0]]
x = da.from_array(ds, chunks=(100, -1, -1))
x = c.persist(x) # takes 40 minutes, far below network and filesystem capabilities
print x[300000,:,:].compute() # works as expected
我们还从我们自己的一些文件文件格式中加载了数据集(使用 slicing、dask.delayed 和 from_delayed),并且随着文件大小的增加,性能也出现了类似的下降。
我的问题:使用 Dask 作为分布式缓存是否存在固有的瓶颈?是否会强制所有数据通过调度程序汇集?工人是否能够利用 Lustre,或者功能和/或 I/O 是否以某种方式序列化?如果是这种情况,在海量数据集上不调用persist,让Dask在需要时处理数据和计算会更有效吗?
【问题讨论】: