【发布时间】:2017-10-02 08:46:39
【问题描述】:
我在 python 中遇到了多处理问题。在下面的代码中,我调用了 7 个工人(multiprocessing.Process)和一个结果 threading.Thread。在处理数据之前和之后(从文件中提取一些元数据),我运行:
lsof | grep ' <user> ' | grep 'python3'
我得到一些打开的句柄:
python3 17291 ivo DEL REG 0,20 5288943 /dev/shm/ZMcs2H
python3 17291 ivo DEL REG 0,20 5288942 /dev/shm/3iMR4q
python3 17291 ivo DEL REG 0,20 5288941 /dev/shm/XPYh79
当在循环中多次运行多处理(处理一些连续消息)时,我得到了
OSError: [Errno 24] Too many open files
处理多处理包有什么问题吗?
def worker_process_results(meta_queue, res_dict):
while True:
try:
(path, meta) = meta_queue.get()
res_dict[path] = meta
finally:
meta_queue.task_done()
def multiprocess_get_metadata(paths, thread_count = 7):
""" Scan files for metadata (multiprocessing). """
file_queue = multiprocessing.JoinableQueue()
meta_queue = multiprocessing.JoinableQueue()
res_dict = dict()
# result thread
meta_thread = threading.Thread(target = lambda: worker_process_results(meta_queue, res_dict))
meta_thread.daemon = True
meta_thread.start()
workers = []
for _ in range(0, min(thread_count, len(paths))):
worker = MetaDataWorker(file_queue, meta_queue)
worker.daemon = True
worker.start()
workers.append(worker)
for path in paths:
file_queue.put(path)
file_queue.join()
meta_queue.join()
for x in workers:
x.terminate()
return res_dict
class MetaDataWorker(multiprocessing.Process):
''' Use library to get meta data from file. '''
def __init__(self, file_queue, meta_queue):
''' Constructor. '''
super().__init__()
self.file_queue = file_queue
self.meta_queue = meta_queue
def run(self):
""" Run. """
while True:
try:
path = self.file_queue.get()
meta = getmetadata(path)
meta = None
self.meta_queue.put((path, meta))
except Exception as err:
print("Thread end.")
print("{0}".format(err))
finally:
self.file_queue.task_done()
【问题讨论】:
-
你说的是“7个工人”,但是上面的代码为每个传入的路径创建一个子进程。
-
no -> 仅当我的路径太少时 -> for _ in range(0, min(thread_count, len(paths)))
标签: python python-3.x python-multiprocessing python-multithreading