【发布时间】:2025-12-29 06:15:07
【问题描述】:
我正在使用多处理队列来处理我的记录。
queue = multiprocessing.Queue()
def produce(i, item):
data = process(i, item)
queue.put(data)
def process(item):
data = do_processing(item)
return data
if __name__ == '__main__':
records = load_records()
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
print('produce items')
for i, item in ennumerate(records.items()):
executor.submit(produce, i, item)
print('queue size:{}'.format(queue.qsize()))
while not queue.empty():
save(queue.get())
队列最初工作得很快。但随着队列的增长,变得非常缓慢。
根据其他 SO 答案,我也尝试使用 multiprocessing.Manager().Queue(),但没有成功。
欢迎指点。
【问题讨论】:
-
您需要在提交时将项目从队列中拉出,这样队列就不会变得太大。您可以使用线程来执行此操作,或者在这种情况下我会说使用
multiprocessing.Pool.imap(它可以为您提交任务并将结果检索到线程)。恕我直言,concurrent.futures没有比multiprocessing.Pool更好的产品了。 -
@Aaron 如果我使用
multiprocessing.Pool.imap如何获得i? -
enumerate 返回一个迭代器,您可以调用 map 或 imap...
-
@Aaron 你能详细说明一下答案吗?真的很感激。
-
我还应该指出,如果您的任务需要一段时间才能运行,并且您设法在完成处理之前清除队列,则使用
while not queue.empty()可能会丢失结果。通常最好确切地知道你应该从队列中get有多少项目,或者从工作人员那里发送某种哨兵以表明不会有更多数据到来。
标签: python-3.x multithreading multiprocessing python-multiprocessing python-multithreading