【问题标题】:Best way to optimize data processing in Python在 Python 中优化数据处理的最佳方法
【发布时间】:2021-07-14 16:54:33
【问题描述】:

我在 Python 3.8 中有以下数据处理管道:

  • 大约 1.3TB 的原始数据存储在 SSD 上,细分为大约 80 个不同的独立类别,进一步细分为单独的 300mb 压缩 csv.gz
  • 3 个主要类,一个将原始数据清理为可读格式,第二个将所述数据聚合并进行我需要的数学运算,第三个导入前 2 个,读取每个 csv,运行所有这些进程并保存结果,遍历 csvs。我有约束,因为它是时间序列数据,所以我必须按顺序进行,因为一切都取决于每个类别的先前值(无矢量化)。我已经尽可能使用 Cython 和 Numba。

我的电脑有 12 核/24 线程。理想情况下,我希望程序的 24 个实例同时运行,每个线程上一个,每个线程从 1 个类别中顺序导出数据,并且尽可能快。

如果我只需要导出例如 3 个类别,我希望程序在 24 个线程上运行,每个实例最多可以使用 8 个线程。

首先,我制作了一个包含 3 个类的脚本,以及一个运行所有内容的 main。如果我自己运行它,它将成功导出 1 类数据,尽管速度很慢。我们将其命名为 script.py

然后,我创建了一个运行 script.py 的函数(我们将调用它parallelize()):

p = subprocess.Popen(mydir/script.py + [myargs], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, preexec_fn=os.setsid) 
p.wait()

使用这些方法,我尝试了以下方法,结果都一样平庸:

  • joblib.Parallel(n_jobs=mp.cpu_count())(延迟(parallelize)(arg) 用于 args 中的 arg)。我已经尝试过任何可能的后端,我尝试过使用 n_jobs=mp.cpu_count() / n_categories,我尝试过使用 parallel_backend 来指定设置。
  • 多处理模块,target = parallelize,然后是 p.start()
  • 相同的多处理模块,但目标 = 一个调用 joblib.Parallel(n_jobs=mp.cpu_count() / n_categories)(delayed(parallelize)(arg) for args in args)的函数

无论我怎么尝试,结果总是一样的:当我启动程序时,所有cpu核心和线程都达到100%,它立即开始导出所有类别,而且速度足够快满足我的需要。即使我只是导出 3 个类别,它也 100% 使用了所有 24 个线程,这表明它很好地利用了多线程。但是仅仅 5-10 分钟后,它突然变慢了,1 个线程保持在 100% 的使用率,另外 23 个线程的使用率下降到 10-20% 左右,它只处理 1 个类别,如果我去处理我的 Ubuntu 系统监视器我看到所有 python 实例都以 0% CPU 运行,除了 1 运行在 10% 到 16% 之间。

如果我停止导出(它保存到它到达的位置)并恢复它,同样的事情也会发生。每 5 分钟运行、停止和重新运行脚本比让它运行几天要快得多。是什么阻止了我的 CPU 一直以 100% 运行,而不仅仅是前 5 分钟?

我暂时没有在脚本的 3 个类中使用任何异步、线程、多线程或多处理,并且所述脚本中最慢的部分是迭代 csv 行。

【问题讨论】:

  • 您的内存使用情况如何?像这样的减速有时可能是由内存泄漏引起的。
  • @user144153 看起来还不错,没有使用超过它需要的东西,几 GB,每次从一个 csv 切换到下一个时都会清除它。
  • 您确定您不只是受 I/O 限制吗?与 CPU 相比,即使是 SSD 也没有那么快。当您尝试使用不同数量的线程时会发生什么?您是否考虑过 SSD 的(物理)缓存行为?无论如何,我建议你在superuser.com 上可能会有更多的运气;这里的问题似乎更有可能是“计算机如何工作”问题而不是“我的代码有什么问题”问题。
  • @KarlKnechtel 我考虑过这一点,但每个 300mb 的 csv 需要大约 30 分钟以上的时间进行迭代/处理/聚合,所以我认为瓶颈在于处理能力。实际上,当我尝试在 HD 上运行它时,它与 SSD 相比并没有慢多少。如果它是 I/O 绑定的,为什么前 5 分钟会超快?如果我将它从 24 个线程减少到 1 个线程,它只会以正常速度运行,但在开始时没有超快的 5 分钟。

标签: python multithreading multiprocessing bigdata joblib


【解决方案1】:

joblib.Parallelmultiprocessing.Pool.map 等函数提供了一种在多个内核/线程上处理任务列表的简单方法。它们通常只采用脚本/计算和一个可迭代作为参数。但是,这两个函数都按他们喜欢的方式分配任务。 Pool.map 检查可迭代对象并将其分配给内核/线程的数量,但不一定大小相等。所以你最终可能会得到核心 1 有 100 个任务,而其余的核心每个只有 10 个任务。这取决于您的可迭代对象以及这两个函数假定的充分拆分。

当 iterable 很大时,iterable 的拆分甚至会比任务的实际计算时间花费更多的时间。有时,由于拆分过程,您在任何任务开始之前就耗尽了内存。

因此,我迁移到始终使用队列并手动进行拆分。这样,您就可以完全控制拆分和消耗的内存,并且可以调试整个过程。

因此,在您的情况下,它看起来类似于:

def script(in_queue, out_queue):
    for task in iter(in_queue.get, 'STOP'):
        # do stuff with your task
        out_queue.put(result)

在你的主线程中:

if __name__ == "__main__":
    in_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()

    numProc = # number of cores you like
    process = [multiprocessing.Process(target=script,
                      args=(in_queue, out_queue) for x in range(numProc)]
 
    for p in process:
        p.start()

    for category in categories:
        in_queue.put(category)

    for p in process:
        in_queue.put('STOP')

使用这种方案,所有进程都执行完全相同的操作:从队列中获取任务,进行计算并将结果放入另一个队列。如果您的核心都具有完全相同的速度,则任务将在一个核心上“按时间顺序”完成,例如:

task1 -> core1
task2 -> core2
task3 -> core1
task4 -> core2

像你这样的情况,100% 在第一个核心上,10% 在所有其他核心上,只有在几乎所有任务都完成时才会出现。

【讨论】:

  • 非常感谢您提供非常详细的回答!会试试这个
猜你喜欢
  • 2016-09-28
  • 2013-04-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-09-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多