【发布时间】:2021-07-14 16:54:33
【问题描述】:
我在 Python 3.8 中有以下数据处理管道:
- 大约 1.3TB 的原始数据存储在 SSD 上,细分为大约 80 个不同的独立类别,进一步细分为单独的 300mb 压缩 csv.gz
- 3 个主要类,一个将原始数据清理为可读格式,第二个将所述数据聚合并进行我需要的数学运算,第三个导入前 2 个,读取每个 csv,运行所有这些进程并保存结果,遍历 csvs。我有约束,因为它是时间序列数据,所以我必须按顺序进行,因为一切都取决于每个类别的先前值(无矢量化)。我已经尽可能使用 Cython 和 Numba。
我的电脑有 12 核/24 线程。理想情况下,我希望程序的 24 个实例同时运行,每个线程上一个,每个线程从 1 个类别中顺序导出数据,并且尽可能快。
如果我只需要导出例如 3 个类别,我希望程序在 24 个线程上运行,每个实例最多可以使用 8 个线程。
首先,我制作了一个包含 3 个类的脚本,以及一个运行所有内容的 main。如果我自己运行它,它将成功导出 1 类数据,尽管速度很慢。我们将其命名为 script.py。
然后,我创建了一个运行 script.py 的函数(我们将调用它parallelize()):
p = subprocess.Popen(mydir/script.py + [myargs], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, preexec_fn=os.setsid)
p.wait()
使用这些方法,我尝试了以下方法,结果都一样平庸:
- joblib.Parallel(n_jobs=mp.cpu_count())(延迟(parallelize)(arg) 用于 args 中的 arg)。我已经尝试过任何可能的后端,我尝试过使用 n_jobs=mp.cpu_count() / n_categories,我尝试过使用 parallel_backend 来指定设置。
- 多处理模块,target = parallelize,然后是 p.start()
- 相同的多处理模块,但目标 = 一个调用 joblib.Parallel(n_jobs=mp.cpu_count() / n_categories)(delayed(parallelize)(arg) for args in args)的函数
无论我怎么尝试,结果总是一样的:当我启动程序时,所有cpu核心和线程都达到100%,它立即开始导出所有类别,而且速度足够快满足我的需要。即使我只是导出 3 个类别,它也 100% 使用了所有 24 个线程,这表明它很好地利用了多线程。但是仅仅 5-10 分钟后,它突然变慢了,1 个线程保持在 100% 的使用率,另外 23 个线程的使用率下降到 10-20% 左右,它只处理 1 个类别,如果我去处理我的 Ubuntu 系统监视器我看到所有 python 实例都以 0% CPU 运行,除了 1 运行在 10% 到 16% 之间。
如果我停止导出(它保存到它到达的位置)并恢复它,同样的事情也会发生。每 5 分钟运行、停止和重新运行脚本比让它运行几天要快得多。是什么阻止了我的 CPU 一直以 100% 运行,而不仅仅是前 5 分钟?
我暂时没有在脚本的 3 个类中使用任何异步、线程、多线程或多处理,并且所述脚本中最慢的部分是迭代 csv 行。
【问题讨论】:
-
您的内存使用情况如何?像这样的减速有时可能是由内存泄漏引起的。
-
@user144153 看起来还不错,没有使用超过它需要的东西,几 GB,每次从一个 csv 切换到下一个时都会清除它。
-
您确定您不只是受 I/O 限制吗?与 CPU 相比,即使是 SSD 也没有那么快。当您尝试使用不同数量的线程时会发生什么?您是否考虑过 SSD 的(物理)缓存行为?无论如何,我建议你在superuser.com 上可能会有更多的运气;这里的问题似乎更有可能是“计算机如何工作”问题而不是“我的代码有什么问题”问题。
-
@KarlKnechtel 我考虑过这一点,但每个 300mb 的 csv 需要大约 30 分钟以上的时间进行迭代/处理/聚合,所以我认为瓶颈在于处理能力。实际上,当我尝试在 HD 上运行它时,它与 SSD 相比并没有慢多少。如果它是 I/O 绑定的,为什么前 5 分钟会超快?如果我将它从 24 个线程减少到 1 个线程,它只会以正常速度运行,但在开始时没有超快的 5 分钟。
标签: python multithreading multiprocessing bigdata joblib