【发布时间】:2021-05-15 16:15:36
【问题描述】:
我正在使用 Dask 来安排和运行研究批次。 那些大多会产生副作用并且非常重(从几分钟到几个小时不等)。任务之间没有通信。
在代码中它看起来像这样,首先我将所有批次传递给处理:
def process_batches(batches: Iterator[Batch], log_dir: Path):
cluster = LocalCluster(
n_workers=os.cpu_count(),
threads_per_worker=1
)
client = Client(cluster)
futures = []
for batch in batches:
futures += process_batch(batch, client, log_dir)
progress(futures)
然后我将每个批次的重复作为任务提交:
def process_batch(batch: Batch, client: Client, log_dir: Path) -> List[Future]:
batch_dir = log_dir.joinpath(batch.nice_hash)
batch_futures = []
num_workers = len(client.scheduler_info()['workers'])
with Logger(batch_dir, clear_dir=True) as logger:
logger.save_json(batch.as_dict, 'batch')
for repetition in range(batch.n_repeats):
cpu_index = repetition % num_workers
future = client.submit(
process_batch_repetition,
batch,
repetition,
cpu_index,
logger
)
batch_futures.append(future)
return batch_futures
有没有办法将一些关于提交任务的自定义信息传递到仪表板?
我所看到的只是任务process_batch_repetition。我可以用自定义字符串替换它,这样我就可以看到目前正在处理哪些批处理配置?
【问题讨论】:
标签: dask dask-distributed