【发布时间】:2021-11-22 10:57:18
【问题描述】:
随着我的数据集越来越大(因此分区的数量和大小也越来越大),我的分布式 Dask 集群中的工作人员最终会因彼此之间的连接超时而失败。
例如,我反复看到错误日志,例如(路径和 IP 被混淆):
distributed.worker - ERROR - Worker stream died during communication: tcp://123.123.123.123:41076
Traceback (most recent call last):
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/comm/core.py", line 321, in connect
await asyncio.wait_for(comm.write(local_info), time_left())
File "/share/apps/python/miniconda3.8/lib/python3.8/asyncio/tasks.py", line 464, in wait_for
raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 2334, in gather_dep
response = await get_data_from_worker(
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 3753, in get_data_from_worker
return await retry_operation(_get_data, operation="get_data_from_worker")
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/utils_comm.py", line 385, in retry_operation
return await retry(
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/utils_comm.py", line 370, in retry
return await coro()
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 3730, in _get_data
comm = await rpc.connect(worker)
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/core.py", line 1012, in connect
comm = await connect(
File "/path/to/my/code/venv/lib/python3.8/site-packages/distributed/comm/core.py", line 325, in connect
raise IOError(
OSError: Timed out during handshake while connecting to tcp://123.123.123.123:41076 after 10 s
根据记录的 IP 和端口,我知道这些是工作人员之间的连接错误,而不是工作人员到调度程序的错误。
堆栈跟踪不包含对我的代码(而不是我的 venv 中的 Dask)的任何引用,但我确实怀疑我在 Dask 中所做的某些事情会导致问题。较早的任务,自定义延迟函数和 DataFrame 函数均已成功完成。
根据在系统出现故障时查看日志和仪表板,我怀疑我在 groupby 聚合中遇到了问题。我已经解决了由数据混洗引起的内存不足问题,方法是对数据进行分区,使分组数据包含在每个分区中(不需要混洗)。我已经确认没有在仪表板中看到任何随机播放任务。尽管记录的堆栈跟踪仍然显示 Dask 工作人员试图使用 gather_dep 函数从另一个工作人员检索数据。
我试图更好地了解员工何时相互沟通以评估我可能正在做什么来导致问题?
逐渐增加超时配置只会使超时时间更长。工作人员在相互连接以进行通信时最终仍会死亡。
关于如何调试问题的任何其他建议?
【问题讨论】:
-
> 我确实怀疑我在 Dask 所做的某些事情会导致问题。您能详细说明一下,您是使用自定义函数还是使用标准函数,那么是哪些?
-
可能还不够清楚,但是在对索引列和其他两个列执行 DataFrame 多列 groupby 聚合时开始出现故障。 > 根据在系统出现故障时查看日志和仪表板,我怀疑我在 groupby 聚合中遇到了问题。
标签: python dask dask-distributed