【问题标题】:Multiprocessing - Iteratively loop through files in folders多处理 - 迭代循环浏览文件夹中的文件
【发布时间】:2021-07-10 12:23:47
【问题描述】:

我必须遍历 30 个 zip 文件夹,每个 zip 文件夹有 50,000 - 90,000 个 jpeg 文件。理想情况下,我会遍历每个 zip 文件夹,因为解压缩每个文件夹会花费太长时间。对于每个文件,我需要打开每个文件,从中提取关键信息,并将信息存储到一个列表中。基于How to do multithreading on a folder with several files?,我尝试启用多处理以使事情变得更快,但是,我无法弄清楚。在下面的示例中,我目前正试图让它与一个文件夹一起工作,然后我需要弄清楚如何让它遍历所有 30 个 zip 文件夹。

import os
from zipfile import ZipFile

data_list = []
 
def image_processor(file):
    with ZipFile("files101.zip") as zip_file:
        with zip_file.open(file, "r") as img_file:
            img_data = img_file.readlines(1) # data is available in beginning of each file
            
            # Extract data #1
            pattern_1 = r'IMG:\d{,3}'
            if re.findall(pattern_1, str(img_data)):
                img_extract = re.findall(pattern_1, str(img_data))[0]
            else:
                img_extract = np.nan

            # Extract timestamp
            time_pattern = r'Time:\s\d{2}-\d{2}-\d{4}\s\s\d{2}:\d{2}:\d{2}'
            if re.findall(time_pattern, str(img_data)):
                time_extract = re.findall(time_pattern, str(img_data))[0]
            else:
                time_extract = np.nan

            # Create list   
            return data_list.append([img_extract, time_extract])

os.chdir(r"C:\\Users\\xxxxxx\\Desktop\\zip")
for folder in os.listdir():
    file_list = ZipFile("files101.zip", "r").namelist()

    with ProcessPool(processes=8) as pool:
        pool.map(image_processor, file_list)

发生的情况是我的代码永远运行,就像它没有启用多处理一样。如果我需要做多线程,我有六个核心。任何建议将不胜感激。

【问题讨论】:

    标签: python for-loop multiprocessing zip


    【解决方案1】:

    您缺少几个导入。但我马上注意到的这些主要项目是:

    1. for folder in os.listdir(): 您正在循环当前目录中的每个文件和目录,但循环没有引用这些文件/目录中的任何一个;您正在重复处理 files101.zip
    2. 根据您的chdir 命令,您似乎正在Windows 下运行。创建新进程的代码必须在 if __name__ == '__main__': 块内。
    3. 处理池中的每个进程都有自己的地址空间,并且会附加到data_list 的不同实例。将data_list 返回到主进程并让主进程将所有返回值附加到主列表将起作用,但必须确保您的image_processor 函数以空data_list 开始每个调用。

    我假设您要处理当前目录中所有扩展名为 .zip 的文件(其中大约有 30 个)。我会修改您的处理,以便每个提交到处理池工作单元的任务不是 zip 存档中的单个文件(在这种情况下,您将提交 30 * 50,000 个任务),而是整个存档。所以你的主要处理函数不再是image_processor,而是zip_processor。我对该文件进行了一些其他更改,这对我来说很有意义(我希望我没有破坏任何东西):

    import os
    import glob
    from zipfile import ZipFile
    import re
    import numpy as np
    from multiprocessing import Pool
    
    def zip_processor(zipfile):
        with ZipFile(zipfile) as zip_file:
            data_list = []
            for file in zip_file.namelist():
                with zip_file.open(file, "r") as img_file:
                    # take element 0 from returned list and convert to a string
                    img_data = img_file.readlines(1)[0].decode() # data is available in beginning of each file
                    # Extract data #1
                    pattern_1 = r'IMG:\d{,3}'
                    # why do findall if you are only using the first occurrence?
                    m = re.search(pattern1, img_data)
                    img_extract = m.group(0) if m else np.nan
        
                    # Extract timestamp
                    time_pattern = r'Time:\s\d{2}-\d{2}-\d{4}\s\s\d{2}:\d{2}:\d{2}'
                    m = re.search(time_pattern, img_data)
                    time_extract = m.group(0) if m else np.nan
        
                    # Create list   
                    data_list.append([img_extract, time_extract])
            return data_list
                
    
    # required for Windows:
    if __name__ == '__main__':
        os.chdir(r"C:\\Users\\xxxxxx\\Desktop\\zip")
    
        # Default pool size:
        with Pool() as pool:
            results = pool.imap(zip_processor, glob.iglob('*.zip'))
            data_list = []
            for result in results:
                data_list.extend(result)
    

    现在由于涉及大量 I/O,因此使用多线程可能会运行得很好,在这种情况下,更大的池大小将是有利的。进行以下更改:

    #from multiprocessing import Pool
    from multiprocessing.pool import ThreadPool
    ... # etc.
    
    if __name__ == '__main__':
        os.chdir(r"C:\\Users\\xxxxxx\\Desktop\\zip")
    
        zip_list = glob.glob('*.zip')
        with ThreadPool(len(zip_list)) as pool:
            results = pool.imap(zip_processor, zip_list)
            ... # etc.
    

    【讨论】:

    • 嗨,Booboo,感谢您的反馈,我会仔细检查并报告。我有一个关于img_file.readlines(1)[0].decode() 的问题。在我的测试中,我能够使用img_file.readlines(1),然后获得必要的数据。为什么需要[0].decode()
    • 那么您必须运行 Python 2。在 Python 3 上,img_file.readlines(1)[0] 将返回一个 字节字符串,而不是字符串(在 Python 2 中,'abc'b'abc' 之间没有区别,但对于 Python 而言并非如此3)。因此,当您执行re.search(pattern1, img_data) 其中pattern1str 类并且img_databytes 类时,您会收到错误消息。调用 decode 会使用当前默认编码(通常为 utf8)将字节字符串转换为 unicode 字符串。如果您是 Python 2,如果您不希望匹配的 unicode 结果,请省略对 decode() 的调用。
    • if 有img_file.readlines(1)[0] 而不是img_file.readlines(1) 的原因是因为readlines 返回一个list,我只对对第一个元素进行模式匹配感兴趣。您的代码将列表转换为字符串,因此您正在对以 '[' 开头并以 ']' 结尾的转换后的字符串进行模式匹配。因此,如果第一个元素的值为'abc',我将只针对字符串'abc' 进行模式匹配,但您将对字符串"['abc']" 进行模式匹配,因为str(['abc']) == "['abc']"。即使它有效,也很愚蠢。
    • 嗨,Booboo,我实现了你的代码。我正在使用 Python 3。当我尝试使用 .decode 时出现错误。没关系,img_file.readlines(1)[0] 按照您上面的评论工作。我一直在测试三个 zip 文件夹。不幸的是,多处理似乎无限运行。多线程代码有效,但它比我应用 for 循环来提取数据要慢。我被告知会有 300 多个 zip 文件(每个 50-90k 图像)而不是原来的 30 个(每个 50-90k 图像),所以多线程可以很好地加快速度。
    • 不要调用 decode 但我认为你需要使用一个字节字符串作为你的正则表达式模式。而且我确实认为您的问题更适合多线程,正如我在第二个代码示例中指出的那样。
    猜你喜欢
    • 2010-12-20
    • 1970-01-01
    • 1970-01-01
    • 2015-12-07
    • 1970-01-01
    • 2020-09-05
    • 2012-05-09
    • 1970-01-01
    相关资源
    最近更新 更多