【问题标题】:Is it possible to select workers for specific tasks in Dask?是否可以在 Dask 中为特定任务选择工人?
【发布时间】:2020-05-28 15:19:07
【问题描述】:

我有一个使用 Dask 在我的 Kubernetes 集群上运行的进程,该进程由两个 map-reduce 阶段组成,但是跨节点的两个映射都可能将大量大文件下载到每个工作人员。为了避免让两台不同的机器在两个不同的地图步骤上处理相同的文件子集,是否可以确定性地选择哪些工作人员为相同的工作获取哪些参数?从概念上讲,我想要的可能是这样的:

workers : List = client.get_workers();
#                       ^^^^^^^^^^^
filenames : List[str] = get_filenames(); # input data to process

# map each file to a specific worker
file_to_worker = { filename : workers[hash(filename) % len(workers)] for filename in filenames }

# submit each file, specifying which worker should be assigned the task
futures = [client.submit(my_func, filename, worker=file_to_worker[filename]) for filename in filenames]
#                                           ^^^^^^ 

这样的事情可以让我将相同文件的不同计算步骤定向到相同的节点,从而无需对文件进行第二次缓存。

【问题讨论】:

    标签: python dask dask-kubernetes


    【解决方案1】:

    是的,您可以将函数提交给特定的工作人员:

    c.run(func, workers=[WorkerA, WorkerB, WorkerC])

    您还可以将元数据资源附加到工作人员并使用这些定义而不是特定主机名提交:

    data = [client.submit(load, fn) for fn in filenames]
    processed = [client.submit(process, d, resources={'GPU': 1}) for d in data]
    final = client.submit(aggregate, processed, resources={'MEMORY': 70e9})
    

    有关设置信息,请查看resource docs

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-01-19
      • 1970-01-01
      • 1970-01-01
      • 2017-08-07
      • 1970-01-01
      • 2020-09-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多