【问题标题】:Show the progress of a Python multiprocessing pool imap_unordered call?显示 Python 多处理池 imap_unordered 调用的进度?
【发布时间】:2011-08-05 17:05:45
【问题描述】:

我有一个脚本,它通过imap_unordered() 调用成功地执行了一组多处理池任务:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

但是,我的num_tasks 大约是 250,000,因此 join() 将主线程锁定 10 秒左右,我希望能够逐步回显到命令行以显示主进程未锁定。比如:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

结果对象或池本身是否有指示剩余任务数的方法?我尝试使用multiprocessing.Value 对象作为计数器(do_work 在完成任务后调用counter.value += 1 操作),但计数器在停止递增之前仅达到总值的 85%。

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:

    快速入门

    使用tqdmmultiprocessing.Pool

    安装

    pip install tqdm
    

    例子

    import time
    import threading
    from multiprocessing import Pool
    
    from tqdm import tqdm
    
    
    def do_work(x):
        time.sleep(x)
        return x
    
    
    def progress():
        time.sleep(3)  # Check progress after 3 seconds
        print(f'total: {pbar.total} finish:{pbar.n}')
    
    
    tasks = range(10)
    pbar = tqdm(total=len(tasks))
    
    if __name__ == '__main__':
        thread = threading.Thread(target=progress)
        thread.start()
        results = []
        with Pool(processes=5) as pool:
            for result in pool.imap_unordered(do_work, tasks):
                results.append(result)
                pbar.update(1)
        print(results)
    

    结果




    烧瓶

    安装

    pip install flask
    

    main.py

    import time
    from multiprocessing import Pool
    
    from tqdm import tqdm
    from flask import Flask, make_response, jsonify
    
    app = Flask(__name__)
    
    
    def do_work(x):
        time.sleep(x)
        return x
    
    
    total = 5  # num of tasks
    tasks = range(total)
    pbar = tqdm(total=len(tasks))
    
    
    @app.route('/run/')
    def run():
        results = []
        with Pool(processes=2) as pool:
            for _result in pool.imap_unordered(do_work, tasks):
                results.append(_result)
                if pbar.n >= total:
                    pbar.n = 0  # reset
                pbar.update(1)
        response = make_response(jsonify(dict(results=results)))
        response.headers.add('Access-Control-Allow-Origin', '*')
        response.headers.add('Access-Control-Allow-Headers', '*')
        response.headers.add('Access-Control-Allow-Methods', '*')
        return response
    
    
    @app.route('/progress/')
    def progress():
        response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
        response.headers.add('Access-Control-Allow-Origin', '*')
        response.headers.add('Access-Control-Allow-Headers', '*')
        response.headers.add('Access-Control-Allow-Methods', '*')
        return response
    

    运行(例如在 Windows 中)

    set FLASK_APP=main
    flask run
    

    API 列表

    test.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Progress Bar</title>
        <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
        <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
        <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
    </head>
    <body>
    <button id="run">Run the task</button>
    <br><br>
    <div class="progress">
        <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
             style="width: 10%">0.00%
        </div>
    </div>
    </body>
    <script>
        function set_progress_rate(n, total) {
            //Set the rate of progress bar
            var rate = (n / total * 100).toFixed(2);
            if (n > 0) {
                $(".progress-bar").attr("aria-valuenow", n);
                $(".progress-bar").attr("aria-valuemax", total);
                $(".progress-bar").text(rate + "%");
                $(".progress-bar").css("width", rate + "%");
            }
        }
    
        $("#run").click(function () {
            //Run the task
            $.ajax({
                url: "http://127.0.0.1:5000/run/",
                type: "GET",
                success: function (response) {
                    set_progress_rate(100, 100);
                    console.log('Results:' + response['results']);
                }
            });
        });
        setInterval(function () {
            //Show progress every 1 second
            $.ajax({
                url: "http://127.0.0.1:5000/progress/",
                type: "GET",
                success: function (response) {
                    console.log(response);
                    var n = response["n"];
                    var total = response["total"];
                    set_progress_rate(n, total);
                }
            });
        }, 1000);
    </script>
    </html>
    

    结果

    【讨论】:

      【解决方案2】:

      一些答案​​适用于进度条,但我无法从池中得到结果

      我使用tqdm 创建进度条 你可以通过pip install tqdm安装它

      下面的简单代码与进度条配合得很好,你也可以得到结果:

      from multiprocessing import Pool
      from tqdm import tqdm
      from time import sleep
      
      tasks = range(5)
      result = []
      
      def do_work(x):
          # do something with x and return the result
          sleep(2)
          return x + 2
      
      if __name__ == '__main__':
          pbar = tqdm(total=len(tasks))
      
          with Pool(2) as p:
              for i in p.imap_unordered(do_work, tasks):
      
                  result.append(i)
                  pbar.update(i)
          
          pbar.close()
          print(result)
      

      【讨论】:

        【解决方案3】:

        在做了一些研究之后,我写了一个名为parallelbar 的小模块。它允许您分别显示池的整体进度和每个核心的进度。 它易于使用并且有很好的描述。

        例如:

        from parallelbar import progress_map
        from parallelbar.tools import cpu_bench
        
        
        if __name__=='__main__':
            # create list of task
            tasks = [1_000_000 + i for i in range(100)]
            progress_map(cpu_bench, tasks)
        

        【讨论】:

          【解决方案4】:

          Pool.apply_async() 的简单解决方案:

          from multiprocessing import Pool
          from tqdm import tqdm
          from time import sleep
          
          
          def work(x):
              sleep(0.2)
              return x**2
          
          
          n = 10
          
          with Pool(4) as p, tqdm(total=n) as pbar:
              res = [p.apply_async(
                  work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
              results = [r.get() for r in res]
          

          【讨论】:

          • 完成后应该关闭 Pool 和 pbar
          • 可能希望避免在最后一行中对池和迭代器使用 varname p
          【解决方案5】:

          按照 Tim 的建议,您可以使用 tqdmimap 来解决此问题。我刚刚偶然发现了这个问题并调整了imap_unordered 解决方案,以便我可以访问映射的结果。以下是它的工作原理:

          from multiprocessing import Pool
          import tqdm
          
          pool = multiprocessing.Pool(processes=4)
          mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
          

          如果您不关心作业返回的值,则无需将列表分配给任何变量。

          【讨论】:

          • 这是最好的答案。在任务完成时显示进度并返回结果。
          【解决方案6】:

          试试这个简单的基于队列的方法,它也可以与池一起使用。请注意,在进度条启动后打印任何内容都会导致它被移动,至少对于这个特定的进度条。 (PyPI的进度1.5)

          import time
          from progress.bar import Bar
          
          def status_bar( queue_stat, n_groups, n ):
          
              bar = Bar('progress', max = n)  
          
              finished = 0
              while finished < n_groups:
          
                  while queue_stat.empty():
                      time.sleep(0.01)
          
                  gotten = queue_stat.get()
                  if gotten == 'finished':
                      finished += 1
                  else:
                      bar.next()
              bar.finish()
          
          
          def process_data( queue_data, queue_stat, group):
          
              for i in group:
          
                  ... do stuff resulting in new_data
          
                  queue_stat.put(1)
          
              queue_stat.put('finished')  
              queue_data.put(new_data)
          
          def multiprocess():
          
              new_data = []
          
              groups = [[1,2,3],[4,5,6],[7,8,9]]
              combined = sum(groups,[])
          
              queue_data = multiprocessing.Queue()
              queue_stat = multiprocessing.Queue()
          
              for i, group in enumerate(groups): 
          
                  if i == 0:
          
                      p = multiprocessing.Process(target = status_bar,
                          args=(queue_stat,len(groups),len(combined)))
                          processes.append(p)
                          p.start()
          
                  p = multiprocessing.Process(target = process_data,
                  args=(queue_data, queue_stat, group))
                  processes.append(p)
                  p.start()
          
              for i in range(len(groups)):
                  data = queue_data.get() 
                  new_data += data
          
              for p in processes:
                  p.join()
          

          【讨论】:

            【解决方案7】:

            我发现在我尝试检查进度时工作已经完成。这就是使用 tqdm 对我有用的方法。

            pip install tqdm

            from multiprocessing import Pool
            from tqdm import tqdm
            
            tasks = range(5)
            pool = Pool()
            pbar = tqdm(total=len(tasks))
            
            def do_work(x):
                # do something with x
                pbar.update(1)
            
            pool.imap_unordered(do_work, tasks)
            pool.close()
            pool.join()
            pbar.close()
            

            这应该适用于所有类型的多处理,无论它们是否阻塞。

            【讨论】:

            • 我认为创建了一堆线程,每个线程都是独立计数的
            • 我的函数中有函数导致酸洗错误。
            • 这不会为我创建一个进度条,但它有点工作。它计算迭代次数(并显示总的预期迭代次数)。尽管由于线程的原因计数会上升和下降(我猜),但在任何时候都或多或少地看到它的位置并不难。到目前为止,这对我来说最有效(我必须使用返回值,这会使其他答案复杂化)。
            【解决方案8】:

            我个人最喜欢的——在事情并行运行和提交时,给你一个漂亮的小进度条和完成 ETA。

            from multiprocessing import Pool
            import tqdm
            
            pool = Pool(processes=8)
            for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
                pass
            

            【讨论】:

            • 如果池返回值怎么办?
            • 我在循环之前创建了一个名为 result 的空列表,然后在循环内只执行 result.append(x)。我用 2 个进程尝试了这个,并使用 imap 而不是 map,一切都按照我想要的方式运行 @nickpick
            • 所以我的进度条正在迭代新行而不是原地进行,知道为什么会这样吗?
            • 别忘了把这段代码包装在if __name__ == "__main__":中,否则它可能会莫名其妙地不起作用
            • @bs7280 你所说的 result.append(x) 是指 result.append(_) 吗? x是什么?
            【解决方案9】:

            我创建了一个自定义类来创建进度打印输出。也许这会有所帮助:

            from multiprocessing import Pool, cpu_count
            
            
            class ParallelSim(object):
                def __init__(self, processes=cpu_count()):
                    self.pool = Pool(processes=processes)
                    self.total_processes = 0
                    self.completed_processes = 0
                    self.results = []
            
                def add(self, func, args):
                    self.pool.apply_async(func=func, args=args, callback=self.complete)
                    self.total_processes += 1
            
                def complete(self, result):
                    self.results.extend(result)
                    self.completed_processes += 1
                    print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))
            
                def run(self):
                    self.pool.close()
                    self.pool.join()
            
                def get_results(self):
                    return self.results
            

            【讨论】:

              【解决方案10】:

              通过更多挖掘自己找到了答案:查看imap_unordered 结果对象的__dict__,我发现它有一个_index 属性,该属性随着每个任务完成而递增。所以这适用于日志记录,包裹在 while 循环中:

              p = multiprocessing.Pool()
              rs = p.imap_unordered(do_work, xrange(num_tasks))
              p.close() # No more work
              while (True):
                completed = rs._index
                if (completed == num_tasks): break
                print "Waiting for", num_tasks-completed, "tasks to complete..."
                time.sleep(2)
              

              但是,我确实发现将imap_unordered 替换为map_async 会导致执行速度更快,尽管结果对象有点不同。相反,来自map_async 的结果对象具有_number_left 属性和ready() 方法:

              p = multiprocessing.Pool()
              rs = p.map_async(do_work, xrange(num_tasks))
              p.close() # No more work
              while (True):
                if (rs.ready()): break
                remaining = rs._number_left
                print "Waiting for", remaining, "tasks to complete..."
                time.sleep(0.5)
              

              【讨论】:

              • 我在 Python 2.7.6 上对此进行了测试,rs._number_left 似乎是剩余的块数。因此,如果 rs._chunksize 不是 1,那么 rs._number_left 将不是剩余的列表项数。
              • 我应该把这段代码放在哪里?我的意思是直到知道rs 的内容之后才执行它并且有点晚了?
              • @WakanTanka:它在分离额外线程后进入主脚本。在我原来的例子中,它进入了“while”循环,rs 已经启动了其他线程。
              • 您能否编辑您的问题和/或答案以显示最低工作示例。我在任何循环中都看不到rs,我是多处理新手,这会有所帮助。非常感谢。
              • 至少在python 3.5 中,使用_number_left 的解决方案不起作用。 _number_left 表示仍有待处理的块。例如,如果我想将 50 个元素并行传递给我的函数,那么对于具有 3 个进程的线程池 _map_async() 创建 10 个块,每个块包含 5 个元素。 _number_left 然后表示这些块中有多少已经完成。
              【解决方案11】:

              我知道这是一个相当老的问题,但是当我想在 python 中跟踪任务池的进展时,我正在做这件事。

              from progressbar import ProgressBar, SimpleProgress
              import multiprocessing as mp
              from time import sleep
              
              def my_function(letter):
                  sleep(2)
                  return letter+letter
              
              dummy_args = ["A", "B", "C", "D"]
              pool = mp.Pool(processes=2)
              
              results = []
              
              pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()
              
              r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
              
              while len(results) != len(dummy_args):
                  pbar.update(len(results))
                  sleep(0.5)
              pbar.finish()
              
              print results
              

              基本上,您将 apply_async 与 callbak 一起使用(在这种情况下,它将返回的值附加到列表中),因此您不必等待执行其他操作。然后,在一个 while 循环中,您检查工作的进度。在这种情况下,我添加了一个小部件以使其看起来更好。

              输出:

              4 of 4                                                                         
              ['AA', 'BB', 'CC', 'DD']
              

              希望对你有帮助。

              【讨论】:

              • 必须更改:[pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args](pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
              • 这不是真的。生成器对象在这里不起作用。已检查。
              【解决方案12】:

              不需要访问结果集的私有属性:

              from __future__ import division
              import sys
              
              for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
                  sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
              

              【讨论】:

              • 我只在代码退出后看到打印输出(不是每次迭代)。你有什么建议吗?
              • @HananShteingart:它在我的系统(Ubuntu)上运行良好,同时使用 Python 2 和 3。我以 def do_word(*a): time.sleep(.1) 为例。如果它对您不起作用,则创建一个 complete minimal code example 来演示您的问题:使用文字描述您期望发生什么以及会发生什么,提及您如何运行 Python 脚本、您的操作系统、Python 版本和post it as a new question.
              • 我遇到了和@HananShteingart 一样的问题:这是因为我尝试使用Pool.map()。我没有意识到 only imap()imap_unordered() 以这种方式工作 - 文档只是说“map() 的惰性版本”,​​但实际上意味着“底层迭代器返回结果,因为它们进来”。
              • @simonmacmullen:问题和我的答案都使用imap_unordered()。 Hanan 的问题可能是由于sys.stderr.write('\r..')(覆盖同一行以显示进度)。
              • 也可以!我主要是想记录下我所做的一个愚蠢的假设——以防其他阅读这篇文章的人也做到了。
              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2017-06-14
              • 2013-12-01
              • 2022-01-08
              • 2021-07-24
              • 2020-08-14
              • 2016-11-10
              相关资源
              最近更新 更多