【问题标题】:Parallel queueing - Multiprocessing pool, python并行队列 - 多处理池,python
【发布时间】:2012-01-23 08:28:51
【问题描述】:

我的目标是遍历一个目录并计算其中所有文件的 MD5。我使用代码来解决类似问题

Parallel file matching, Python

import os
import re
import sys
import time
import md5

from stat import S_ISREG

import multiprocessing

global queue
size_limit = 500000

target = sys.argv[1]



############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################

def walk_files(topdir):
     """yield up full pathname for each file in tree under topdir"""
     for dirpath, dirnames, filenames in os.walk(topdir):
         for fname in filenames:
             pathname = os.path.join(dirpath, fname)
             yield pathname

 def files_to_search(topdir):
     """yield up full pathname for only files we want to search"""
     for fname in walk_files(topdir):
         try:
             # if it is a regular file and big enough, we want to search it
             sr = os.stat(fname)
             if S_ISREG(sr.st_mode) and sr.st_size <= size_limit:
                 yield fname
         except OSError:
             pass

def worker_search_fn(fname):
     fp = open(fname, 'rt')
     # read one line at a time from file
     contents = fp.read()
     hash = md5.md5(contents)
     global queue
     print "enqueue"
     queue.put(fname+'-'+hash.hexdigest())

################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################

#kick of processes to md5 the files and wait till completeion

queue = multiprocessing.Queue()
pool = multiprocessing.Pool()
pool.map(worker_search_fn, files_to_search(target))
pool.close()
pool.join()

#Should be done, now lets send do our analysis
while not queue.empty():
    print queue.get()

我添加了“print enqueue”语句作为调试目的,我注意到在递归大型目录树时代码确实锁定了。我不确定是否有两个进程同时尝试访问队列,从而导致死锁。

也许有更好的方法来做到这一点?该结构不必是队列,但必须是无锁的,以充分利用多处理。我想要一个并行的递归和 md5 目录,一旦完成,就对整个列表做一些事情。为了调试,我只是打印完成的队列。有什么建议吗?

【问题讨论】:

  • 你已经拥有的代码有什么问题,它是否以某种方式失败?

标签: python multithreading parallel-processing multiprocessing


【解决方案1】:

不清楚您的程序是受 I/O 限制还是 CPU 限制,即,如果任务受 I/O 限制,单个进程可能比多个进程执行得更好,例如,通过最小化磁盘寻道次数。您可以通过指定不同的 nprocesses 值(如下)来检查这一点,看看在您的情况下什么能提供更好的结果。

在这种情况下你不需要排队:

#!/usr/bin/env python
import os
import sys

from hashlib         import md5
from multiprocessing import Pool, freeze_support
from stat            import S_ISREG

def walk_files(topdir):
     """yield up full pathname for each file in tree under topdir"""
     for dirpath, dirnames, filenames in os.walk(topdir):
         for fname in filenames:
             pathname = os.path.join(dirpath, fname)
             yield pathname

def files_to_process(topdir, size_limit):
    """yield up full pathname for only files we want to process"""
    for fname in walk_files(topdir):
        try: sr = os.stat(fname)
        except OSError: pass
        else:
            # if it is a regular file and small enough, we want to process it
            if S_ISREG(sr.st_mode) and sr.st_size <= size_limit:
                yield fname

def md5sum(fname):
    with open(fname, 'rb') as fp:
        # read all file at once
        contents = fp.read()
        hash = md5(contents)
        return fname, hash.hexdigest()

def main(argv=None):
    if argv is None:
        argv = sys.argv
    topdir = argv[1]
    size_limit = 500000
    nprocesses = 1

    pool = Pool(processes=nprocesses)
    files = files_to_process(topdir, size_limit)
    for fname, hexdigest in pool.imap_unordered(md5sum, files):
        print("%s\t%s" % (fname, hexdigest))

if __name__=="__main__":
    freeze_support()
    main()

示例

$ python md5sum.py .
./md5sum.py 9db44d3117673790f1061d4b8f00e8ce

【讨论】:

    【解决方案2】:

    因为大目录需要大量时间来执行walk_files()不是死锁

    还有……

    删除pool.join()

    multiprocessing.Pool().map() 阻塞直到结果准备好,所以你不需要pool.join()

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-04-18
      • 1970-01-01
      • 2013-10-26
      • 1970-01-01
      • 1970-01-01
      • 2017-06-15
      • 2015-04-20
      • 1970-01-01
      相关资源
      最近更新 更多