【发布时间】:2022-01-03 08:18:45
【问题描述】:
我正在使用Dask Bag 在一个特殊集群上运行一些简单的map-reduce计算:
import dask.bag as bag
summed_image = bag.from_sequence(my_ids).map(gen_image_from_ids).reduction(sum, sum).compute()
此代码生成一个链式计算,从from_sequence 和gen_image_from_ids 开始映射,然后将所有结果与sum 简化为一个。由于 Dask Bag 的特性,求和是在多级树中并行完成的。
我的特殊集群设置有更高的失败率,因为我的worker可以随时被杀死,并且CUP被其他更高阶的进程接管,然后在一段时间后释放。杀戮可能每 5 分钟仅在单个节点上发生一次,但我的总缩减作业可能需要 5 分钟以上。
虽然 Dask 擅长故障恢复,但我的工作有时永远不会结束。考虑如果作业树中的任何内部节点被杀死,则所有先前计算的临时中间结果都丢失了。并且计算应该从头开始。
Dask Future 对象有 replicate,但我在更高级别的 Dask Bag 或 Dataframe 上找不到类似的功能来确保数据弹性。请让我知道是否有一种通用的处理方法可以将中间结果保留在具有超高故障率的 Dask 集群中。
更新 - 我的解决方法
也许任何分布式计算系统都会遭受频繁的故障,即使系统可以从中恢复。在我的情况下,工作人员关闭本质上不是系统故障,而是由高阶进程触发的。因此,高阶进程现在不再直接杀死我的工人,而是启动一个小的 python 脚本来发送retire_worker() 命令,当它开始运行时。
如文件所述,retire_worker() 调度程序会将数据从退休工人转移到另一个可用的工人。所以我的问题暂时解决了。但是,我仍然保留这个问题,因为我认为复制冗余计算将是一种更快的解决方案,并且可以更好地利用集群中的空闲节点。
【问题讨论】:
标签: python mapreduce dask dask-distributed dask-dataframe