【问题标题】:Progress bar with multiprocessing多处理进度条
【发布时间】:2023-04-01 14:38:01
【问题描述】:

我使用 multiprocessing 包来运行函数:run_performance,它会在其中加载包含多个 csv 文件的 zip 文件。

我搜索以正确显示进度条,其中包含每个 zip 文件中的 csv 数量。 使用我的代码,显示不连贯/错误:

我的代码:

from alive_progress import alive_bar
from zipfile import ZipFile
import os

def get_filepaths(directory):
    file_paths = []  # List which will store all of the full filepaths.
    # Walk the tree.
    for root, directories, files in os.walk(directory):
        for filename in files:
            # Join the two strings in order to form the full filepath.
            filepath = os.path.join(root, filename)
            file_paths.append(filepath)  # Add it to the list.
    return file_paths  # Self-explanatory.

def count_files_7z(myarchive):
   cnt_files = []
   with closing(ZipFile(myarchive)) as archive:
      for csv in archive.namelist():
         cnt_files.append(csv)
      return cnt_files

def run_performance(zipobj):
   zf = zipfile.ZipFile(zipobj)
   cnt = count_files_7z(zipobj)
   with alive_bar(len(cnt)) as bar:
      for f in zf.namelist():
         bar()
         with zf.open(f) as myfile:
            print(myfile) # and done other things

list_dir = "path_of_zipfiles" #

 for idx1, folder in enumerate(list_dir):
    get_all_zips = get_filepaths(folder)
    for idx2, zip_file in enumerate(get_all_zips):
       with zipfile.ZipFile(zip_file) as zipobj:
          p = Process(target=run_performance,args=(zipobj.filename,))
          p.start()
     p.join()

我的展示:

|████▌                                   | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s)|████▌                                   | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s)|████▌                                   | ▄▆█ 1/9 [11%] in 0s (3.3/s, eta: 0s
...

如果我将p.join() 行与p.start() 放在相同的缩进处,则显示正确,但多处理不再起作用。 所以脚本花费了太多时间:

1 分 18 秒与 0 分 14 秒

期望的输出:

|████████████████████████████████████████| 1/1 [100%] in 2.4s (0.41/s)
|████████████████████████████████████████| 2/2 [100%] in 4.7s (0.43/s)
|████████████████████                    | ▄▂▂ 1/2 [50%] in 2s (0.6/s, eta: 0s)

【问题讨论】:

  • 如果您想同时运行多个进程,我建议您使用multiprocessing.Pool 并调用Pool.join(),而不是单独加入任何进程。
  • 我想我知道会发生什么。请在`p.start()`前加上print()
  • 请找到:[root@vm-grafana bin]# time ./02-pickle-client.py 一些空格.. 正在加载 csv 文件 |████████████ ████████| 2/2 [100%] in 0.8s (2.41/s) 加载 csv 文件 |████████████████████| 1/1 [100%] in 0.9s (1.09/s) 加载 csv 文件 |████████████████████| 2.1 秒内 2/2 [100%] (0.96/s)
  • 但是显示应该会更好,只是因为打印了很多空间.. 什么附加?

标签: python multiprocessing progress-bar


【解决方案1】:

首先是一些关于您的代码的通用方法。在您的主进程中,您使用文件路径打开 zip 存档只是为了取回原始文件名。这真的没有太多意义。然后在count_files_7z 中迭代zf.namelist() 的返回值以在zf.namelist() 已经是这些文件的列表时构建存档中的文件列表。这也没有太大意义。您还可以使用上下文管理器功能 closing 来确保存档在块的末尾关闭,但 with 块本身是一个上下文管理器,用于相同的目的。

我尝试安装 alive-progress,但进度条一团糟。这是一项更适合多线程而不是多处理的任务。实际上,它可能更适合串行处理,因为对磁盘执行并发 I/O 操作(除非它是固态驱动器)可能会损害性能。如果您读取的文件涉及大量 CPU 密集型处理,您将获得性能。如果是这种情况,我已经向每个线程传递了一个多处理池,您可以在其中执行对 apply 的调用,指定您在其中放置 CPU 密集型代码的函数。但是当在多线程而不是多处理下完成时,进度条应该会更好。即便如此,我也无法通过 alive-progress 获得任何体面的显示,诚然我并没有在这上面花费太多时间。所以我转而使用 PyPi 存储库中提供的更常见的 tqdm 模块。

即使使用 tqdm 也存在一个问题,即当进度条达到 100% 时,tqdm 必须编写一些内容(换行符?)以重新定位其他进度条.因此,我所做的是指定leave=False,这会导致条形图在达到 100% 时消失。但至少你可以看到所有的进度条在进行时不会失真。

from multiprocessing.pool import Pool, ThreadPool
from threading import Lock
import tqdm
from zipfile import ZipFile
import os
import heapq

def get_filepaths(directory):
    file_paths = []  # List which will store all of the full filepaths.
    # Walk the tree.
    for root, directories, files in os.walk(directory):
        for filename in files:
            # Join the two strings in order to form the full filepath.
            filepath = os.path.join(root, filename)
            file_paths.append(filepath)  # Add it to the list.
    return file_paths  # Self-explanatory.


def get_free_position():
    """ Return the minimum possible position """
    with lock:
        free_position = heapq.heappop(free_positions)
    return free_position

def return_free_position(position):
    with lock:
        heapq.heappush(free_positions, position)

def run_performance(zip_file):
    position = get_free_position()
    with ZipFile(zip_file) as zf:
        file_list = zf.namelist()
        with tqdm.tqdm(total=len(file_list), position=position, leave=False) as bar:
            for f in file_list:
                with zf.open(f) as myfile:
                    ... # do things with myfile (perhaps myfile.read())
                    # for CPU-intensive tasks: result = pool.apply(some_function, args=(arg1, arg2, ... argn))
                    import time
                    time.sleep(.005) # simulate doing something
                bar.update()
    return_free_position(position)

def generate_zip_files():
    list_dir = ['path1', 'path2']
    for folder in list_dir:
        get_all_zips = get_filepaths(folder)
        for zip_file in get_all_zips:
            yield zip_file

# Required for Windows:
if __name__ == '__main__':
    N_THREADS = 5
    free_positions = list(range(N_THREADS)) # already a heap
    lock = Lock()
    pool = Pool()
    thread_pool = ThreadPool(N_THREADS)
    for result in thread_pool.imap_unordered(run_performance, generate_zip_files()):
        pass
    pool.close()
    pool.join()
    thread_pool.close()
    thread_pool.join()

上面的代码使用了一个任意限制大小为 5 个的多处理线程池,仅作为演示。您可以将N_THREADS 增加或减少到您想要的任何值,但正如我所说,它可能会或可能不会有助于提高性能。如果您希望每个 zip 文件有一个线程,那么:

if __name__ == '__main__':
    zip_files = list(generate_zip_files())
    N_THREADS = len(zip_files)
    free_positions = list(range(N_THREADS)) # already a heap
    lock = Lock()
    pool = Pool()
    thread_pool = ThreadPool(N_THREADS)
    for result in thread_pool.imap_unordered(run_performance, zip_files):
        pass
    pool.close()
    pool.join()
    thread_pool.close()
    thread_pool.join()

【讨论】:

  • 嗨 Booboo,感谢您的解释和新代码。我需要用你的代码调整我的代码。我会尽快通知你
  • 我收到此错误:文件“/opt/import2grafana/bin/libsgrafana.py”,第 241 行,在 get_free_position 中,带锁:NameError: name 'lock' is not defined
  • 您不是偶然修改了代码吗,比如说用于多处理或其他一些更改?
  • 其实我只是修改函数def generate_zip_files(mypath):
  • 我只是尝试使用您的代码。我需要修改你的,因为,我需要过滤 zip 文件。我不需要全部阅读
【解决方案2】:

alive_bar 似乎记住了调用时光标的位置,并从该点开始绘制条形图。当您启动许多进程时,每个进程都不知道另一个进程,并且输出会被打乱。

确实,github 中有一个关于此的未解决问题(请参阅here)。使用多线程有一些 hacky 解决方案,但我认为使用多处理解决它并不容易,除非您在进程间通信上实现某种会减慢速度的方式。

【讨论】:

    【解决方案3】:

    Enlighten 代码库中有一个类似的example。您只需将 process_files() 函数替换为您自己的函数即可。

    在这里重新创建有点大,但想法是您实际上应该只在主进程中进行控制台输出,并使用某种形式的 IPC 来传递来自子进程的信息。 Enlighten 示例为 IPC 使用队列,这非常合理,因为它只发送当前计数。

    【讨论】:

      猜你喜欢
      • 2018-10-13
      • 1970-01-01
      • 2021-12-22
      • 1970-01-01
      • 2017-06-14
      • 1970-01-01
      • 1970-01-01
      • 2017-07-17
      • 2018-04-28
      相关资源
      最近更新 更多