【问题标题】:Dask Distributed: access client futures from separate processDask Distributed:从单独的进程访问客户期货
【发布时间】:2020-04-19 22:52:33
【问题描述】:

我用Dask Distributed启动了很多模拟:

from time import sleep
from distributed import Client, as_completed

def simulation(x):
    """ Proxy function for simulation """
    sleep(60 * 60 * 24)  # wait one day
    return hash(x)

def save(result):
    with open("result", "w") as f:
        print(result, file=f)

if __name__ == "__main__":
    client = Client("localhost:8786")
    futures = client.map(simulation, range(1000))

    for future in as_completed(future):
        result = future.result()
        save(result)  

但是,这段代码有一个错误:open("result", "w") 应该是open(str(result), "w")。我想纠正这个错误,客户期货的重新处理。

但是,我不知道有什么方法可以在不通过键盘中断停止 Python 进程的情况下执行此操作,而不是将作业重新提交到 Dask 集群。我不想这样做,因为这些模拟需要几天时间。

我想访问客户拥有的所有期货并保存所有现有结果。我该如何做到这一点?

可能相关的问题

【问题讨论】:

    标签: python dask dask-distributed


    【解决方案1】:

    client.has_what 是您正在寻找的方法:

    from distributed import Client, Future
    
    if __name__ == "__main__":
        client = Client("localhost:8786")
        futures = [Future(key) for keys in client.has_what().values() for key in keys]
    
        for future in as_completed(futures):
            ...
    

    【讨论】:

      猜你喜欢
      • 2012-05-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多