【问题标题】:How to pinpoint cause of Dask worker to worker connection timeout issues?如何查明 Dask 工作人员到工作人员连接超时问题的原因?
【发布时间】:2021-11-22 10:57:18
【问题描述】:

随着我的数据集越来越大(因此分区的数量和大小也越来越大),我的分布式 Dask 集群中的工作人员最终会因彼此之间的连接超时而失败。

例如,我反复看到错误日志,例如(路径和 IP 被混淆):

distributed.worker - ERROR - Worker stream died during communication: tcp://123.123.123.123:41076
Traceback (most recent call last):
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/comm/core.py", line 321, in connect
    await asyncio.wait_for(comm.write(local_info), time_left())
  File "/share/apps/python/miniconda3.8/lib/python3.8/asyncio/tasks.py", line 464, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 2334, in gather_dep
    response = await get_data_from_worker(
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 3753, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/utils_comm.py", line 385, in retry_operation
    return await retry(
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 3730, in _get_data
    comm = await rpc.connect(worker)
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/core.py", line 1012, in connect
    comm = await connect(
  File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/comm/core.py", line 325, in connect
    raise IOError(
OSError: Timed out during handshake while connecting to tcp://123.123.123.123:41076 after 10 s

根据记录的 IP 和端口,我知道这些是工作人员之间的连接错误,而不是工作人员到调度程序的错误。

堆栈跟踪不包含对我的代码(而不是我的 venv 中的 Dask)的任何引用,但我确实怀疑我在 Dask 中所做的某些事情会导致问题。较早的任务,自定义延迟函数和 DataFrame 函数均已成功完成。

根据在系统出现故障时查看日志和仪表板,我怀疑我在 groupby 聚合中遇到了问题。我已经解决了由数据混洗引起的内存不足问题,方法是对数据进行分区,使分组数据包含在每个分区中(不需要混洗)。我已经确认没有在仪表板中看到任何随机播放任务。尽管记录的堆栈跟踪仍然显示 Dask 工作人员试图使用 gather_dep 函数从另一个工作人员检索数据。

我试图更好地了解员工何时相互沟通以评估我可能正在做什么来导致问题?

逐渐增加超时配置只会使超时时间更长。工作人员在相互连接以进行通信时最终仍会死亡。

关于如何调试问题的任何其他建议?

【问题讨论】:

  • > 我确实怀疑我在 Dask 所做的某些事情会导致问题。您能详细说明一下,您是使用自定义函数还是使用标准函数,那么是哪些?
  • 可能还不够清楚,但是在对索引列和其他两个列执行 DataFrame 多列 groupby 聚合时开始出现故障。 > 根据在系统出现故障时查看日志和仪表板,我怀疑我在 groupby 聚合中遇到了问题。

标签: python dask dask-distributed


【解决方案1】:

此页面帮助描述 Dask groupby 聚合过程:https://saturncloud.io/docs/reference/dask_groupby_aggregations/

特别是,如果您的分区已经根据 groupby 列排序,您可以使用map_partitions() 单独处理每个分区。请注意分区需要足够小才能存储在内存中。我不清楚这是否是整个 DataFrame 需要在内存中,或者 Dask 是否足够小心,只需要单独在内存中的每个分区。 https://saturncloud.io/docs/reference/dask_groupby_aggregations/#use-map_partitions-instead

我已经能够使用 map_partitions 来解决我之前遇到的工作人员对工作人员的通信和超时问题。这并不能解释为什么 Dask 会在 DataFrame 已经在其中一个 groupby 列上分区时尝试与其他工作人员通信,但如果您发现自己处于类似情况,它至少可以让您继续前进。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-10-17
    • 1970-01-01
    • 1970-01-01
    • 2015-12-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多