【问题标题】:python - multiprocessing issues with class fields and methodspython - 类字段和方法的多处理问题
【发布时间】:2017-10-31 15:54:09
【问题描述】:

我需要在一个数据分析 python 项目中同时使用类和多处理功能,但我在 Google 上没有找到很好的示例。

我的基本想法 - 这可能是错误的 - 是创建一个具有大变量的类(在我的例子中是 pandas 数据框),然后定义一个计算操作的方法(在这种情况下是总和) .

import multiprocessing
import time

class C:
    def __init__(self):
        self.__data = list(range(0, 10**7))

    def func(self, nums):
        return sum(nums)

    def start_multi(self):
        for n_procs in range(1, 4):
            print()
            time_start = time.clock()
            chunks = [self.__data[(i-1)*len(self.__data)// n_procs: (i)*len(self.__data)// n_procs] for i in range(1, n_procs+1)]
            pool = multiprocessing.Pool(processes=n_procs)
            results = pool.map_async(self.func, chunks )
            results.wait()
            pool.close()
            results = results.get()
            print(sum(results))
            print("n_procs", n_procs, "total time: ", time.clock() - time_start)

print('sum(list(range(0, 10**7)))', sum(list(range(0, 10**7))))
c = C()
c.start_multi()

代码不能正常工作:我得到以下打印输出

sum(list(range(0, 10**7))) 49999995000000

49999995000000
n_procs 1 total time:  0.45133500000000026

49999995000000
n_procs 2 total time:  0.8055279999999954

49999995000000
n_procs 3 total time:  1.1330870000000033

即计算时间增加而不是减少。那么,这段代码的错误是什么?

但我也担心 RAM 的使用,因为当创建变量块时,self.__data RAM 的使用会翻倍。在处理多处理代码时,更具体地说,在这段代码中,是否有可能避免这种内存浪费? (我保证将来我会把所有东西都放在 Spark 上 :))

【问题讨论】:

  • 您似乎理解得很准确,您必须创建副本并将它们发送到不同的进程。这不可能比普通的sum 更快。顺便说一句,您在 __data 属性中使用双下划线是否有特殊原因?
  • 但是,这与你使用类无关。您应该阅读multiprocessing 文档中关于共享状态的部分。这不是微不足道的。

标签: python multiprocessing pool


【解决方案1】:

看起来这里有一些事情在起作用:

  1. 分块操作非常慢。在我的计算机上,chunks 的生成花费了大约 16% 的时间来处理具有多个进程的案例。单进程、非池版本没有这种开销。
  2. 您正在向您的流程发送大量数据。 chunks 数组是需要获取 pickled 并发送到新进程的范围的所有原始数据。发送开始和结束索引而不是发送所有原始数据会容易得多。
  3. 一般来说,如果您将计时器放入您的func,您会发现大部分时间都没有花在上面。这就是为什么你没有看到加速。大部分时间都花在了分块、酸洗、分叉和其他开销上。

作为替代方案,您应该尝试切换分块技术,只计算开始和结束数字,并避免发送太多数据。

接下来,我建议做一些比计算总和更难的计算。例如,您可以尝试计算素数。这是一个示例,我们使用来自here 的简单素数计算,并且我们使用了修改后的分块技术。否则,尽量保持代码不变。

import multiprocessing
import time
from math import sqrt; from itertools import count, islice

# credit to https://stackoverflow.com/a/27946768
def isPrime(n):
    return n > 1 and all(n%i for i in islice(count(2), int(sqrt(n)-1)))

limit = 6
class C:
    def __init__(self):
        pass

    def func(self, start_end_tuple):
        start, end = start_end_tuple
        primes = []
        for x in range(start, end):
            if isPrime(x):
                primes.append(x)
        return len(primes)

    def get_chunks(self, total_size, n_procs):
        # start and end value tuples
        chunks = []

        # Example: (10, 5) -> (2, 0) so 2 numbers per process
        # (10, 3) -> (3, 1) or here the first process does 4 and the others do 3
        quotient, remainder = divmod(total_size, n_procs)
        current_start = 0
        for i in range(0, n_procs):
            my_amount = quotient
            if i == 0:
                # somebody needs to do extra
                my_amount += remainder
            chunks.append((current_start, current_start + my_amount))
            current_start += my_amount
        return chunks

    def start_multi(self):
        for n_procs in range(1, 4):
            time_start = time.clock()
            # chunk the start and end indices instead
            chunks = self.get_chunks(10**limit, n_procs)
            pool = multiprocessing.Pool(processes=n_procs)
            results = pool.map_async(self.func, chunks)
            results.wait()
            results = results.get()
            print(sum(results))
            time_delta = time.clock() - time_start
            print("n_procs {} time {}".format(n_procs, time_delta))

c = C()
time_start = time.clock()
print("serial func(...) = {}".format(c.func((1, 10**limit))))
print("total time {}".format(time.clock() - time_start))
c.start_multi()

这应该会加速多个进程。假设你有它的核心。

【讨论】:

    猜你喜欢
    • 2011-06-28
    • 2019-02-07
    • 1970-01-01
    • 2021-10-27
    • 1970-01-01
    • 1970-01-01
    • 2015-05-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多