我可以告诉你怎么做,但是你会对结果很不满意。有两个问题。第一个问题是,在创建进程池和将参数传递给工作函数并获取返回值时存在一定的开销,因为工作函数“存在”在不同的地址空间中,并且参数和返回值必须是“腌制”和“未腌制”用于传输。因此,在您的情况下,工作函数 formating 需要非常重要,以使产生的开销值得。其次,您的工作函数使用numpy,它本身取决于它在做什么,有时会在内部对其某些方法调用使用多处理。在它之上使用你自己的多处理不会给你买任何东西。 worker 函数很短,numpy 方法是用 C 语言编写的,执行速度很快,这是一个相当琐碎的 worker 函数的例子。
以下是在循环中进行 3 次 formating 调用的基准,该循环迭代 100 次并计时经过的时间,然后使用大小为 3 的多处理池和方法 Pool.map 重复相同的事情,然后再重复一次使用方法Pool.apply_async(对于这个例子,我希望最后两个多处理案例在运行时间上或多或少相同):
import multiprocessing as mp
import numpy as np
from functools import partial
import time
def formating(a, b):
# Formating goes here
x = np.sort(b);
# digitize
l = np.digitize(a, x)
# output:
result = np.bincount(l, weights=a)
return result
# needed for Windows:
if __name__ == '__main__':
Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
TRIALS = 100
# non-multiprocessing:
t = time.time()
for _ in range(TRIALS):
result1 = formating(Numbers, limit1)
result2 = formating(Numbers, limit2)
result3 = formating(Numbers, limit3)
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
# multiprocessing version 1 (using method map):
# since first argument to formating is always the same:
worker = partial(formating, Numbers)
t = time.time()
for _ in range(TRIALS):
with mp.Pool(3) as pool:
result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
# multiprocessing version 2 (using method apply_async)
t = time.time()
for _ in range(TRIALS):
with mp.Pool(3) as pool:
results = [pool.apply_async(formating, args=(Numbers, limit)) for limit in [limit1, limit2, limit3]]
result1, result2, result3 = [result.get() for result in results]
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
打印:
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 0.00299835205078125
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 27.002381324768066
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 27.023000240325928
结果
多处理版本的运行速度仅慢了大约 9000 倍(使用 map 和 apply_async 没有区别)。
如果我从基准测试中剔除创建池的开销,情况会大大改善:
import multiprocessing as mp
import numpy as np
from functools import partial
import time
def formating(a, b):
# Formating goes here
x = np.sort(b);
# digitize
l = np.digitize(a, x)
# output:
result = np.bincount(l, weights=a)
return result
# needed for Windows:
if __name__ == '__main__':
Numbers = np.array([3, 4, 5, 7, 8, 10,20])
limit1 = np.array([0, 2 , 5, 12, 15])
limit2 = np.array([0, 2 , 5, 12])
limit3 = np.array([0, 2 , 5, 12, 15, 22])
TRIALS = 100
# multiprocessing version 1 (using method map):
# since first argument to formating is always the same:
worker = partial(formating, Numbers)
with mp.Pool(3) as pool:
t = time.time()
for _ in range(TRIALS):
result1, result2, result3 = pool.map(worker, [limit1, limit2, limit3])
elapsed = time.time() - t
print(result1, result2, result3, elapsed)
打印:
[ 0. 0. 7. 30. 0. 20.] [ 0. 0. 7. 30. 20.] [ 0. 0. 7. 30. 0. 20.] 0.32500314712524414
但与 0.003 秒相比,它仍然需要 0.325 秒。这只是向您展示了主要开销在于池的创建——但您仍然必须创建池并考虑该开销。
这是如何你这样做的,但不要在这种情况下。