【问题标题】:Multiprocessing code running much slower than single-threaded code多处理代码运行速度比单线程代码慢得多
【发布时间】:2021-09-02 13:55:52
【问题描述】:

我正在尝试学习如何在 Python 中使用 multiprocessing 包,我编写了以下代码,它随机生成一个大型二维数组,然后计算出每行中有多少个数字在指定的间隔内(在这种情况下,介于 4 和 8 之间):

import time
import multiprocessing as mp
import numpy as np

def how_many_within_range(row, minimum, maximum):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count += 1
    return count

if __name__ == '__main__':
    data = np.random.randint(0, 10, size=[10000000, 5])
    print(data[:5])

    start_time = time.perf_counter()

    # With parallelisation
    with mp.Pool(mp.cpu_count()) as pool:
        results = [ pool.apply(how_many_within_range, args=(row, 4, 8)) \
                       for row in data ]

    # Without parallelisation
    # results = [ how_many_within_range(row, 4, 8) for row in data ]

    print(f'Time elapsed: {time.perf_counter() - start_time}')
    print(results[:5])

如果没有多处理,代码运行大约需要 40 秒,但是使用它,程序会慢得多,并且不会在实际时间内完成。我很确定我正确地遵循了我正在使用的教程,那么我做错了什么?

【问题讨论】:

  • pool.apply 是一个阻塞调用。使用 pool.apply_async() 并开始并行运行。不过,apply_async 的返回结构有点不同。

标签: python multiprocessing python-multiprocessing


【解决方案1】:

.apply() 在这种情况下是错误的函数。 .starmap() 更合适,但对于这种简单的情况,启动进程和在进程间传输大量数据的开销会使其总体速度变慢。

import time
import multiprocessing as mp
import numpy as np

def how_many_within_range(row, minimum, maximum):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count += 1
    return count

if __name__ == '__main__':
    data = np.random.randint(0, 10, size=[1000000, 5])
    print(data[:5])

    # With parallelisation
    start_time = time.perf_counter()
    with mp.Pool() as pool:
        results = pool.starmap(how_many_within_range, ((row,4,8) for row in data), chunksize=1000)
    print(f'Time elapsed: {time.perf_counter() - start_time}')
    print(results[:5])

    # Without parallelisation
    start_time = time.perf_counter()
    results = [ how_many_within_range(row, 4, 8) for row in data ]
    print(f'Time elapsed: {time.perf_counter() - start_time}')
    print(results[:5])

输出:

[[1 4 8 9 2]
 [9 1 6 7 0]
 [0 7 6 8 4]
 [4 5 6 9 9]
 [6 6 9 9 1]]
Time elapsed: 3.3232607
[2, 2, 4, 3, 2]
Time elapsed: 2.4664016999999996
[2, 2, 4, 3, 2]

【讨论】:

  • 非常感谢,这绝对好多了!是否有可能解释 apply 和 starmap 之间的区别(我也尝试过使用 apply_async 但这似乎没有多大帮助)?我已经阅读了文档,但我仍然不明白何时使用其中一个...
  • @MrLatinNerd Apply 在池中运行一个函数并等待结果。 Starmap 接受一个提供参数的迭代器,并使用每组参数并行调用函数。 chunksize 参数为每​​个要处理的进程提供一组参数。
  • 太棒了,感谢您的清晰解释!
【解决方案2】:

documentation 看来,Pool.apply() 正在阻塞,因此您获得了启动进程的开销,但没有获得并行性。

【讨论】:

    【解决方案3】:

    为什么需要在如此简单的函数中使用多处理,甚至使用 numpy 数组? 尝试使用此代码

    %%timeit
    np.sum((data>=4)&(data<=8), axis=1)
    198 ms ± 3.44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    

    在for循环中不需要遍历数组元素 并立即执行

    【讨论】:

    • 来自问题:“......我正在尝试学习如何使用多处理包......”。
    • 是的 - 我知道这里不需要多处理,我只是在测试它并试图让它工作。
    猜你喜欢
    • 2019-10-15
    • 2011-08-27
    • 1970-01-01
    • 2016-03-06
    • 1970-01-01
    • 2016-10-23
    • 1970-01-01
    • 1970-01-01
    • 2016-04-07
    相关资源
    最近更新 更多