【问题标题】:Python, tracking using tqdm across parallel sub-tasksPython,在并行子任务中使用 tqdm 进行跟踪
【发布时间】:2020-10-08 21:40:14
【问题描述】:

为了控制我正在处理的代码,我尝试创建一个跟踪不同线程中发生的许多任务的单一跟踪。 我在运行开始时就知道任务(和工人)的数量。

用于演示(不起作用,玩具示例):

from multiprocessing import Pool
from tqdm import tqdm

def work(i, t):
    for _ in range(10**6):
        t.update()
    return i

def wrapped_work(params):
    work(*params)

def main(n=1):
    # another loop:
    with Pool(processes=8) as p:
        with tqdm(total=n * 10**6) as t:
            return sum(p.map(work, ((i, t) for i in range(1, n+1))))

if __name__ == "__main__":
    main(5)

我试图用池暗示this topic,但没有成功。 非常感谢您的帮助。

【问题讨论】:

    标签: python multithreading tqdm


    【解决方案1】:

    基于this post

    from multiprocessing import Pool, Process, Value
    from ctypes import c_bool, c_long
    
    from tqdm.auto import tqdm
    
    
    class TqdmMultiprocessing:
        max_processes = 64
    
        def __init__(self, static_func, processes=64):
            self.counter = Value(c_long, lock=False)
            self.pool = Pool(
                processes=min(processes, self.max_processes),
                initializer=self.worker_init,
                initargs=(static_func, self.counter)
            )
    
        def __enter__(self):
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.pool.close()
    
        def tqdm(self, static_func, iterable, **kwargs):
            done_value = Value(c_bool)
    
            proc = Process(target=self.listener, args=(self.counter, done_value, kwargs,))
            proc.start()
    
            result = self.pool.map(static_func, iterable)
    
            done_value.value = True
            proc.join()
            self.counter.value = 0
    
            return result
    
        @staticmethod
        def listener(counter: Value, is_done: Value, kwargs):
            with tqdm(**kwargs) as tqdm_bar:
                old_counter = 0
                while not is_done.value:
                    new_counter = counter.value
                    tqdm_bar.update(new_counter - old_counter)
                    old_counter = new_counter
                tqdm_bar.update(tqdm_bar.total - old_counter)
    
        @staticmethod
        def worker_init(static_func, counter: Value):
            static_func.counter = counter
    
    
    def work(i):
        for _ in range(10**6):
            work.counter.value += 1
        return i
    
    
    def main(n=1):
        with TqdmMultiprocessing(work, processes=3) as p:
            p.tqdm(work, range(n), total=n * 10 ** 6)
            p.tqdm(work, range(n), total=n * 10 ** 6)
    
    
    if __name__ == "__main__":
        main(5)
    

    【讨论】:

    • 在 colab 中我发现了这个 hack(在 tqdm 之前):print('\r', end='', flush=True)
    • 如果有人知道如何改变忙碌的等待,我将不胜感激
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-21
    • 1970-01-01
    • 2016-12-23
    • 1970-01-01
    • 2015-04-11
    • 2012-05-06
    相关资源
    最近更新 更多