【发布时间】:2020-10-01 11:51:09
【问题描述】:
我发现了其他几个与该主题相关的问题,但没有一个与我的情况非常相似。
我有几个非常大的文本文件(大小超过 3 GB)。
我想使用multiprocessing 并行处理它们(比如 2 个文档)。作为我处理的一部分(在单个进程中),我需要进行 API 调用,因此希望每个进程都有自己的 threads 以异步运行。
我想出了一个简化的例子(我已经对代码进行了注释,试图解释我认为它应该做什么):
import multiprocessing
from threading import Thread
import threading
from queue import Queue
import time
def process_huge_file(*, file_, batch_size=250, num_threads=4):
# create APICaller instance for each process that has it's own Queue
api_call = APICaller()
batch = []
# create threads that will run asynchronously to make API calls
# I expect these to immediately block since there is nothing in the Queue (which is was
# the api_call.run depends on to make a call
threads = []
for i in range(num_threads):
thread = Thread(target=api_call.run)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
####
# start processing the file line by line
for line in file_:
# if we are at our batch size, add the batch to the api_call to to let the threads do
# their api calling
if i % batch_size == 0:
api_call.queue.put(batch)
else:
# add fake line to batch
batch.append(fake_line)
class APICaller:
def __init__(self):
# thread safe queue to feed the threads which point at instances
of these APICaller objects
self.queue = Queue()
def run(self):
print("waiting for something to do")
self.queue.get()
print("processing item in queue")
time.sleep(0.1)
print("finished processing item in queue")
if __name__ == "__main__":
# fake docs
fake_line = "this is a fake line of some text"
# two fake docs with line length == 1000
fake_docs = [[fake_line] * 1000 for i in range(2)]
####
num_processes = 2
procs = []
for idx, doc in enumerate(fake_docs):
proc = multiprocessing.Process(target=process_huge_file, kwargs=dict(file_=doc))
proc.start()
procs.append(proc)
for proc in procs:
proc.join()
正如现在的代码,“等待某事做”打印 8 次(每个进程有 4 个线程有意义)然后它停止或“死锁”,这不是我所期望的 - 我希望它开始与一旦我开始将项目放入队列中,线程就开始了,但代码似乎并没有做到这一点。我通常会逐步找到一个挂断,但我仍然不了解如何使用Threads 进行最佳调试(另一天的另一个主题)。
与此同时,有人可以帮我弄清楚为什么我的代码没有做它应该做的事情吗?
【问题讨论】:
-
在
process_huge_file函数中,在for line in file_循环之后而不是之前加入线程。
标签: python multithreading multiprocessing python-multiprocessing python-multithreading