【问题标题】:How can we use tqdm in a parallel execution with joblib?我们如何在与 joblib 的并行执行中使用 tqdm?
【发布时间】:2016-10-14 17:30:31
【问题描述】:

我想并行运行一个函数,并等待所有并行节点完成,使用 joblib。就像在示例中一样:

from math import sqrt
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))

但是,我希望像 tqdm 一样在单个进度条中看到执行,显示已完成的作业数。

你会怎么做?

【问题讨论】:

标签: python parallel-processing joblib tqdm


【解决方案1】:

只需将range(10) 放入tqdm(...)!这对你来说可能看起来太好了,但它确实有效(在我的机器上):

from math import sqrt
from joblib import Parallel, delayed  
from tqdm import tqdm  
result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000)))

【讨论】:

  • 这仅在进程开始时显示进度,而不是在完成时显示:Parallel(n_jobs=10)(delayed(time.sleep)(i ** 2) for i in tqdm(range(10)))
  • 它可以工作,但不适用于例如字符串列表...还尝试将列表包装在iter...
  • @curious95 尝试将列表放入生成器,以下似乎对我有用:from math import sqrt from joblib import Parallel, delayed import multiprocessing from tqdm import tqdm rng = range(100000) rng = ['a','b','c','d'] for j in range(20): rng += rng def get_rng(): i = 0 for i in range(len(rng)): yield rng[i] result = Parallel(n_jobs=2)(delayed(sqrt)(len(i) ** 2) for i in tqdm(get_rng()))
  • 另外一个问题,有一个很优雅的solution来解决这个问题。
  • 这行不通,tqdm 将立即转到 %100。
【解决方案2】:

我已经创建了 pqdm 一个并行的 tqdm 包装器,它带有并发的未来来轻松完成这项工作,试一试!

安装

pip install pqdm

并使用

from pqdm.processes import pqdm
# If you want threads instead:
# from pqdm.threads import pqdm

args = [1, 2, 3, 4, 5]
# args = range(1,6) would also work

def square(a):
    return a*a

result = pqdm(args, square, n_jobs=2)

【讨论】:

  • 干得好家伙!无法忍受你为什么不被接受。非常感谢!
  • 不幸的是,这对我来说失败了。我不知道为什么,但看起来 pqdm 并没有等到函数调用结束。我现在没有时间创建 MWE。不过,感谢您的努力(和 +1)。
  • @YairDaon 可能会尝试使用有界执行器,尝试将bounded=True 添加到 pqdm。
  • 这就像一个魅力,感谢图书馆。有帮助!
  • 它是否适用于列表理解?
【解决方案3】:

修改 nth's great answer 以允许动态标志使用或不使用 TQDM,并提前指定总数,以便正确填写状态栏。

from tqdm.auto import tqdm
from joblib import Parallel

class ProgressParallel(Parallel):
    def __init__(self, use_tqdm=True, total=None, *args, **kwargs):
        self._use_tqdm = use_tqdm
        self._total = total
        super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        with tqdm(disable=not self._use_tqdm, total=self._total) as self._pbar:
            return Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        if self._total is None:
            self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()

【讨论】:

    【解决方案4】:

    如上所述,仅包装传递给joblib.Parallel() 的可迭代的解决方案并不能真正监控执行进度。相反,我建议继承 Parallel 并覆盖 print_progress() 方法,如下所示:

    import joblib
    from tqdm.auto import tqdm
    
    class ProgressParallel(joblib.Parallel):
        def __call__(self, *args, **kwargs):
            with tqdm() as self._pbar:
                return joblib.Parallel.__call__(self, *args, **kwargs)
    
        def print_progress(self):
            self._pbar.total = self.n_dispatched_tasks
            self._pbar.n = self.n_completed_tasks
            self._pbar.refresh()
    

    【讨论】:

      【解决方案5】:

      这里是可能的解决方法

      def func(x):
          time.sleep(random.randint(1, 10))
          return x
      
      def text_progessbar(seq, total=None):
          step = 1
          tick = time.time()
          while True:
              time_diff = time.time()-tick
              avg_speed = time_diff/step
              total_str = 'of %n' % total if total else ''
              print('step', step, '%.2f' % time_diff, 
                    'avg: %.2f iter/sec' % avg_speed, total_str)
              step += 1
              yield next(seq)
      
      all_bar_funcs = {
          'tqdm': lambda args: lambda x: tqdm(x, **args),
          'txt': lambda args: lambda x: text_progessbar(x, **args),
          'False': lambda args: iter,
          'None': lambda args: iter,
      }
      
      def ParallelExecutor(use_bar='tqdm', **joblib_args):
          def aprun(bar=use_bar, **tq_args):
              def tmp(op_iter):
                  if str(bar) in all_bar_funcs.keys():
                      bar_func = all_bar_funcs[str(bar)](tq_args)
                  else:
                      raise ValueError("Value %s not supported as bar type"%bar)
                  return Parallel(**joblib_args)(bar_func(op_iter))
              return tmp
          return aprun
      
      aprun = ParallelExecutor(n_jobs=5)
      
      a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
      a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
      a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
      a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
      

      【讨论】:

      • 这是一个四处走走,但进度条只有在分派任务时才会更新。更新进度条的更好时机是任务完成的时间。
      【解决方案6】:

      如果您的问题由多个部分组成,您可以将这些部分拆分为k 子组,并行运行每个子组并更新其间的进度条,从而使k 更新进度。

      这在文档中的以下示例中得到了演示。

      >>> with Parallel(n_jobs=2) as parallel:
      ...    accumulator = 0.
      ...    n_iter = 0
      ...    while accumulator < 1000:
      ...        results = parallel(delayed(sqrt)(accumulator + i ** 2)
      ...                           for i in range(5))
      ...        accumulator += sum(results)  # synchronization barrier
      ...        n_iter += 1
      

      https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers

      【讨论】:

      • 这如何回答关于“单个进度条”的问题?
      • 这绝对不能回答关于进度条的问题
      猜你喜欢
      • 2019-11-28
      • 2022-06-27
      • 1970-01-01
      • 2023-03-04
      • 1970-01-01
      • 2016-09-29
      • 2017-11-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多