【发布时间】:2020-07-14 18:02:42
【问题描述】:
使用 dask 分布式时,我需要将 dask 数组保存到 hdf5。我的情况和本期描述的很相似:https://github.com/dask/dask/issues/3351。基本上这段代码可以工作:
import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
def create_and_store_dask_array():
data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))
data.to_hdf5('test.h5', '/test')
# this fails too
# f = h5py.File('test.h5', 'w')
# dset = f.create_dataset('/matrix', shape=data.shape)
# da.store(data, dset) #
# f.close()
create_and_store_dask_array()
但是,一旦我尝试涉及分布式调度程序,我就会收到 TypeError: can't pickle _thread._local objects。
import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
from dask.distributed import Client, LocalCluster,progress,performance_report
def create_and_store_dask_array():
data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))
data.to_hdf5('test.h5', '/test')
# this fails too
# f = h5py.File('test.h5', 'w')
# dset = f.create_dataset('/matrix', shape=data.shape)
# da.store(data, dset) #
# f.close()
cluster = LocalCluster(n_workers=35,threads_per_worker=1)
client =Client(cluster)
create_and_store_dask_array()
我目前正在解决这个问题,方法是将我的计算分成小块提交给调度程序,将结果收集到内存中并使用 h5py 保存数组,但这非常非常慢。谁能建议解决这个问题的好方法?问题讨论表明 xarray 可以采用 dask 数组并将其写入 hdf5 文件,尽管这看起来很慢。
import xarray as xr
import netCDF4
import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
cluster = LocalCluster(n_workers=35,threads_per_worker=1)
client =Client(cluster)
data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))
#data.to_hdf5('test.h5', '/test')
test = xr.DataArray(data,dims=None,coords=None)
#save as hdf5
test.to_netcdf("test.h5",mode='w',format="NETCDF4")
如果有人可以提出解决此问题的方法,我非常有兴趣找到解决方案(尤其是不涉及添加额外依赖项的解决方案)
提前致谢,
【问题讨论】:
标签: dask dask-distributed dask-delayed