【问题标题】:Dask - diagnostics dashboard - custom info about taskDask - 诊断仪表板 - 关于任务的自定义信息
【发布时间】:2021-05-15 16:15:36
【问题描述】:

我正在使用 Dask 来安排和运行研究批次。 那些大多会产生副作用并且非常重(从几分钟到几个小时不等)。任务之间没有通信。

在代码中它看起来像这样,首先我将所有批次传递给处理:

def process_batches(batches: Iterator[Batch], log_dir: Path):

    cluster = LocalCluster(
        n_workers=os.cpu_count(),
        threads_per_worker=1
    )

    client = Client(cluster)
    futures = []

    for batch in batches:
        futures += process_batch(batch, client, log_dir)

    progress(futures)

然后我将每个批次的重复作为任务提交:

def process_batch(batch: Batch, client: Client, log_dir: Path) -> List[Future]:
    batch_dir = log_dir.joinpath(batch.nice_hash)
    batch_futures = []

    num_workers = len(client.scheduler_info()['workers'])

    with Logger(batch_dir, clear_dir=True) as logger:
        logger.save_json(batch.as_dict, 'batch')

        for repetition in range(batch.n_repeats):
            cpu_index = repetition % num_workers

            future = client.submit(
                process_batch_repetition,
                batch,
                repetition,
                cpu_index,
                logger
            )

            batch_futures.append(future)

    return batch_futures

有没有办法将一些关于提交任务的自定义信息传递到仪表板? 我所看到的只是任务process_batch_repetition。我可以用自定义字符串替换它,这样我就可以看到目前正在处理哪些批处理配置?

【问题讨论】:

    标签: dask dask-distributed


    【解决方案1】:

    得到了 Dask 的 BDFL mrocklin 的答复。

    您可以使用 key= 关键字来指定将来的密钥。这应该 每个未来都是独一无二的。 Dask 将使用键名的前缀来 确定它在仪表板上的呈现方式。请参阅文档字符串 dask.utils.key_split 有关如何从 键。

    所以你可以这样使用它:

    future = client.submit(
        process_batch_repetition,
        batch,
        repetition,
        cpu_index,
        logger,
        key=f'{str(batch)}_repetition_{repetition}'
    )
    

    您只需为此任务传递一个唯一的字符串。有一些禁止的字符(即空格),所以预计会出现一些关键错误。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-12-25
      • 2020-10-23
      • 2015-03-29
      • 2020-07-07
      • 1970-01-01
      • 1970-01-01
      • 2017-12-31
      • 2017-05-27
      相关资源
      最近更新 更多