【问题标题】:Python multiprocessing and too many open filesPython多处理和太多打开的文件
【发布时间】:2017-10-02 08:46:39
【问题描述】:

我在 python 中遇到了多处理问题。在下面的代码中,我调用了 7 个工人(multiprocessing.Process)和一个结果 threading.Thread。在处理数据之前和之后(从文件中提取一些元数据),我运行:

lsof | grep ' <user> ' | grep 'python3'

我得到一些打开的句柄:

python3   17291              ivo  DEL       REG               0,20             5288943 /dev/shm/ZMcs2H
python3   17291              ivo  DEL       REG               0,20             5288942 /dev/shm/3iMR4q
python3   17291              ivo  DEL       REG               0,20             5288941 /dev/shm/XPYh79

当在循环中多次运行多处理(处理一些连续消息)时,我得到了

OSError: [Errno 24] Too many open files

处理多处理包有什么问题吗?

def worker_process_results(meta_queue, res_dict):
    while True:
        try:
            (path, meta) = meta_queue.get()
            res_dict[path] = meta
        finally:
            meta_queue.task_done()

def multiprocess_get_metadata(paths, thread_count = 7):
    """ Scan files for metadata (multiprocessing). """
    file_queue = multiprocessing.JoinableQueue()
    meta_queue = multiprocessing.JoinableQueue()

    res_dict   = dict()
    # result thread    
    meta_thread = threading.Thread(target = lambda: worker_process_results(meta_queue, res_dict))
    meta_thread.daemon = True
    meta_thread.start()

    workers = []

    for _ in range(0, min(thread_count, len(paths))):
        worker = MetaDataWorker(file_queue, meta_queue)
        worker.daemon = True
        worker.start()        
        workers.append(worker)

    for path in paths:
        file_queue.put(path)

    file_queue.join()
    meta_queue.join()

    for x in workers:
        x.terminate()

    return res_dict

class MetaDataWorker(multiprocessing.Process):
    ''' Use library to get meta data from file. '''

    def __init__(self, file_queue, meta_queue):
        ''' Constructor. '''
        super().__init__()

        self.file_queue = file_queue
        self.meta_queue = meta_queue

    def run(self):
        """ Run. """

        while True:
            try:
                path = self.file_queue.get()
                meta = getmetadata(path)
                meta = None
                self.meta_queue.put((path, meta))
            except Exception as err:
                print("Thread end.")
                print("{0}".format(err))
            finally:
                self.file_queue.task_done()

【问题讨论】:

  • 你说的是“7个工人”,但是上面的代码为每个传入的路径创建一个子进程。
  • no -> 仅当我的路径太少时 -> for _ in range(0, min(thread_count, len(paths)))

标签: python python-3.x python-multiprocessing python-multithreading


【解决方案1】:

已经解决了,我需要向工作线程和结果线程发送一些结束信号以停止永无止境的循环

【讨论】:

  • 如果您将解决方案(代码)发布给有类似问题的人,那就太好了。
猜你喜欢
  • 2016-08-04
  • 1970-01-01
  • 1970-01-01
  • 2011-05-22
  • 2017-10-09
  • 2015-12-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多