【问题标题】:Difference between two datasets with Dask and Xarray使用 Dask 和 Xarray 的两个数据集之间的差异
【发布时间】:2021-05-19 04:54:51
【问题描述】:

我需要使用 Dask 和 Xarray 计算两个数据集(每月重新采样的两个每日变量)之间的差异。这是我的代码:

def diff(path_1,path_2):
    import xarray as xr
    max_v=xr.open_mfdataset(path_1, combine='by_coords', concat_dim="time", parallel=True)['variable_1'].resample({'time': '1M'}).max()
    min_v=xr.open_mfdataset(path_2, combine='by_coords', concat_dim="time", parallel=True)['variable_2'].resample({'time': '1M'}).min()
    
    return (max_v-min_v).compute()
        
future = client.submit(diff,path_1,path_2)
diff = client.gather(future)

我也试过这个:

%%time
def max_var(path):
    import xarray as xr
    multi_file_dataset = xr.open_mfdataset(path, combine='by_coords', concat_dim="time", parallel=True)
    max_v=multi_file_dataset['variable_1'].resample(time='1M').max(dim='time')
    return max_v.compute()

def min_var(path):
    import xarray as xr
    multi_file_dataset = xr.open_mfdataset(path, combine='by_coords', concat_dim="time", parallel=True)
    min_v=multi_file_dataset['variable_2'].resample(time='1M').min(dim='time')
    return min_v.compute()

futures=[]
future = client.submit(max_temp,path1)
futures.append(future)
future = client.submit(min_temp,path2)
futures.append(future)
results = client.gather(futures)

diff = results[0]-results[1]

但我注意到在 getitem-nanmax 和 getitem-nanmin 的最后一步中计算变得非常缓慢(例如 1974 年到 1980 年)。

这里是集群配置:

cluster = SLURMCluster(walltime='1:00:00',cores=5,memory='5GB')
cluster.scale(jobs=10)

每个数据集由几个文件组成:总大小=7GB

有没有更好的方法来实现这种计算?

谢谢

【问题讨论】:

  • 次要问题,但变量和函数使用相同的名称并不是一个好主意,因此在您的第一个 sn-p 中考虑区分 diff var/function。
  • @SultanOrazbayev 感谢您的建议。除此之外,您认为实现还可以吗?

标签: dask python-xarray dask-distributed


【解决方案1】:

不是 100% 确定这适用于您的情况,但如果没有 mwe,很难做得更好。所以,我怀疑xarray 使用的.compute() 可能与client.submit 冲突,因为现在计算正在工作人员身上进行,我不确定它是否可以在对等方之间正确分配工作(但这是一个怀疑,我不确定)。所以解决这个问题的一种方法是将计算放到主脚本中(因为xarray 将与背景中的dask 集成),所以也许这会起作用:

import xarray as xr

max_v=xr.open_mfdataset(path_1, combine='by_coords', concat_dim="time", parallel=True, chunks={'time': 10})['variable_1'].resample({'time': '1M'}).max()
min_v=xr.open_mfdataset(path_2, combine='by_coords', concat_dim="time", parallel=True, chunks={'time': 10})['variable_2'].resample({'time': '1M'}).min()
    
diff_result = (max_v-min_v).compute()

下面是不同数据集上的mwe

import xarray as xr

# chunks option will create dask array
ds = xr.tutorial.open_dataset('rasm', decode_times=True, chunks={'time': 10})

# these are lazy calculations
max_v = ds['Tair'].resample({'time': '1M'}).max()
min_v = ds['Tair'].resample({'time': '1M'}).min()

# this will use dask scheduler in the background
diff_result = (max_v-min_v).compute()

# since the data refers to the same variable, all the results will be either 0 or `nan` (if the variable was not available in that time/x/y combination)

【讨论】:

  • 我尝试了第一个代码 sn-p(因为我有两个不同的变量),但它似乎无法正常工作......
  • 您是否添加了chunks 选项?您看到的错误是什么?
  • 是的,我添加了该选项。计算花费了太多时间,实际上我没有从监控 UI 中看到任何计算
  • 嗯,sn-p里我没加,但是要启动集群。是你做的吗?没有mwe 就很难调试问题。您可以使用我的mwe 作为起点并重现您的问题吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-12-13
相关资源
最近更新 更多