Python 标准库为多处理提供了两个选项:模块multiprocessing 和concurrent.futures。第二个在第一个之上添加了一层抽象。对于像您这样的简单地图场景,用法非常简单。
这里有一些东西可以试验:
import numpy as np
from time import time
from os import cpu_count
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
def string_to_float(string):
return np.array(np.asarray(string.split()), dtype=float)
if __name__ == '__main__':
# Example datasplit
rng = np.random.default_rng()
num_strings = 100000
datasplit = [' '.join(str(n) for n in rng.random(50))
for _ in range(num_strings)]
# Looping (sequential processing)
start = time()
matrix = []
for i in range(len(datasplit)):
matrix.append(np.array(np.asarray(datasplit[i].split()), dtype=float))
print(f'Duration of sequential processing: {time() - start:.2f} secs')
# Setting up multiprocessing
num_workers = int(0.8 * cpu_count())
chunksize = max(1, int(len(datasplit) / num_workers))
# Multiprocessing with Pool
start = time()
with Pool(num_workers) as p:
matrix = p.map(string_to_float, datasplit, chunksize)
print(f'Duration of parallel processing (Pool): {time() - start:.2f} secs')
# Multiprocessing with ProcessPoolExecutor
start = time()
with ProcessPoolExecutor(num_workers) as ppe:
matrix = list(ppe.map(string_to_float, datasplit, chunksize=chunksize))
print(f'Duration of parallel processing (PPE): {time() - start:.2f} secs')
您应该使用num_workers,更重要的是chunksize 变量。我在这里使用的那些在很多情况下对我都很有效。您也可以让系统决定选择什么,这些参数是可选的,但结果可能不是最理想的,尤其是当要处理的数据量很大时。
对于 1000 万个字符串(您的范围)和chunksize=10000,我的机器产生了以下结果:
Duration of sequential processing: 393.78 secs
Duration of parallel processing (Pool): 73.76 secs
Duration of parallel processing (PPE): 85.82 secs
PS:你为什么用np.array(np.asarray(string.split()), dtype=float)而不是np.asarray(string.split(), dtype=float)?