【发布时间】:2021-01-29 21:19:37
【问题描述】:
我正在从一个大文件中读取一个块,将其作为行列表加载到内存中,然后在每一行上处理一个任务。
顺序解决方案耗时太长,因此我开始研究如何并行化它。
我想出的第一个解决方案是使用 Process 并管理列表中的每个子进程。
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
for piece in read_in_chunks(file, CHUNKSIZE):
jobs = []
piece_list = piece.splitlines()
piece_list_len = len(piece_list)
item_delta = round(piece_list_len/N_PROCESSES)
start = 0
for process in range(N_PROCESSES):
finish = start + item_delta
p = mp.Process(target=work, args=(piece_list[start:finish]))
start = finish
jobs.append(p)
p.start()
for job in jobs:
job.join()
它在大约 2498 毫秒内完成每个块。
然后我发现了 Pool 工具来自动管理切片。
import multiprocessing as mp
BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()
def read_in_chunks(file_object, chunk_size=1024):
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
with open(BIG_FILE_PATH, encoding="Latin-1") as file:
with mp.Pool(N_PROCESSES) as pool:
for piece in read_in_chunks(file, CHUNKSIZE):
piece_list = piece.splitlines()
pool.map(work, piece_list)
它在大约 15540 毫秒内完成每个块,比手动慢 6 倍,但仍然比顺序快。
我是否使用了错误的池? 有没有更好或更快的方法来做到这一点?
感谢您的阅读。
更新
正如 Hannu 建议的那样,游泳池的开销很大。
Process 方法调用的工作函数需要一个行列表。
由于 Pool 决定切片的方式,由 Pool 方法调用的工作函数需要一行。
我不太确定如何让池一次给某个工人多条线路。
那应该能解决问题吧?
更新 2
最后一个问题,还有第三种更好的方法吗?
【问题讨论】:
-
您正在循环中创建
Pool。因此,它被一次又一次地创建。在开始循环之前创建一次,如here所示。 -
哦不,我怎么看不到!谢谢,但运行时间不变。
标签: python performance multiprocessing python-multiprocessing pool