【问题标题】:Distributed chained computing with Dask on a high failure-rate cluster?在高故障率集群上使用 Dask 进行分布式链式计算?
【发布时间】:2022-01-03 08:18:45
【问题描述】:

我正在使用Dask Bag 在一个特殊集群上运行一些简单的map-reduce计算:

import dask.bag as bag

summed_image = bag.from_sequence(my_ids).map(gen_image_from_ids).reduction(sum, sum).compute()

此代码生成一个链式计算,从from_sequencegen_image_from_ids 开始映射,然后将所有结果与sum 简化为一个。由于 Dask Bag 的特性,求和是在多级树中并行完成的。

我的特殊集群设置有更高的失败率,因为我的worker可以随时被杀死,并且CUP被其他更高阶的进程接管,然后在一段时间后释放。杀戮可能每 5 分钟仅在单个节点上发生一次,但我的总缩减作业可能需要 5 分钟以上。

虽然 Dask 擅长故障恢复,但我的工作有时永远不会结束。考虑如果作业树中的任何内部节点被杀死,则所有先前计算的临时中间结果都丢失了。并且计算应该从头开始。

Dask Future 对象有 replicate,但我在更高级别的 Dask Bag 或 Dataframe 上找不到类似的功能来确保数据弹性。请让我知道是否有一种通用的处理方法可以将中间结果保留在具有超高故障率的 Dask 集群中。

更新 - 我的解决方法

也许任何分布式计算系统都会遭受频繁的故障,即使系统可以从中恢复。在我的情况下,工作人员关闭本质上不是系统故障,而是由高阶进程触发的。因此,高阶进程现在不再直接杀死我的工人,而是启动一个小的 python 脚本来发送retire_worker() 命令,当它开始运行时。

如文件所述,retire_worker() 调度程序会将数据从退休工人转移到另一个可用的工人。所以我的问题暂时解决了。但是,我仍然保留这个问题,因为我认为复制冗余计算将是一种更快的解决方案,并且可以更好地利用集群中的空闲节点。

【问题讨论】:

    标签: python mapreduce dask dask-distributed dask-dataframe


    【解决方案1】:

    这可能不是您正在寻找的解决方案,但一种选择是将任务序列分成足够小的批次,以确保任务及时完成(或快速从头开始重新执行) )。

    可能是这样的:

    import dask.bag as db
    from toolz import partition_all
    
    n_per_chunk = 100 # just a guess, the best number depends on the case
    tasks = list(partition_all(n_per_chunk, my_ids))
    
    results = []
    for t in tasks:
        summed_image = (
            db
            .from_sequence(my_ids)
            .map(gen_image_from_ids)
            .reduction(sum, sum)
            .compute()
        )
        results.append(summed_image)
    
    summed_image = sum(results) # final result
    

    关于在失败时重新启动工作流(或可能并行启动较小的任务),这里还有其他一些事情需要记住,但希望这可以为您提供一个可行的解决方案的起点。

    【讨论】:

    • 我试过你的代码,但速度很慢,因为 summed_images 是按顺序生成的。然而,这是鼓舞人心的。我认为一种方法是将代码中的 .compute() 替换为非阻塞 .persist() 并在任何可用时收集结果。
    • 是的,有很多方法可以从这里开始......哪种方法更好取决于具体情况。
    【解决方案2】:

    更新:稍后进行更多试验——这个答案并不理想,因为client.replicate() 命令被阻塞。我怀疑它需要在制作副本之前完成所有期货 - 这是不需要的,因为 1. 任何中间节点都可以在一切就绪之前断开连接,以及 2. 它会阻止其他任务异步运行。我需要其他方法来制作副本。

    经过大量试验,我找到了一种在链式计算期间复制中间结果以实现数据冗余的方法。请注意并行 reduction 函数是 Dask Bag 功能,它不直接支持 replicate 功能。但是,正如 Dask 文档所述,可以通过 replicate 低级 Dask Future 对象来提高弹性。

    按照@SultanOrazbayev 的帖子手动执行部分和,使用persist() 函数将部分和保留在集群内存中,如评论中一样,返回的项目本质上是一个Dask Future

    import dask.bag as db
    from dask.distributed import futures_of
    from toolz import partition_all
    
    n_per_chunk = 100 # just a guess, the best number depends on the case
    tasks = list(partition_all(n_per_chunk, my_ids))
    
    bags = []
    for t in tasks:
        summed_image = (
            db
            .from_sequence(my_ids)
            .map(gen_image_from_ids)
            .reduction(sum, sum)
            .persist()
        )
        bags.append(summed_image)
    
    futures = futures_of(bags)  # This can only be called on the .persist() result
    

    然后我可以复制这些远程中间部分和,并且对sum 期货感到更安全以获得最终结果:

    client.replicate(futures, 5) # Improve resiliency by replicating to 5 workers
    summed_image = client.submit(sum, futures).result()  # The only line that blocks for the final result
    

    这里我觉得 5 的副本对于我的集群来说是稳定的,尽管更高的值会导致更高的网络开销来在工作人员之间传递副本。

    这可行,但可能会有所改进,例如如何对中间结果执行并行归约(求和),尤其是在有很多任务时。请留下您的建议。

    【讨论】:

      猜你喜欢
      • 2011-03-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-19
      • 2018-03-16
      • 2011-12-09
      • 2019-12-24
      • 2020-12-29
      相关资源
      最近更新 更多