【问题标题】:python multiprocess using map, but with one sub-process runningpython多进程使用map,但有一个子进程正在运行
【发布时间】:2021-05-24 09:14:07
【问题描述】:

我是pythonmap()函数实现并行代码的新手。

def main_function(sample):
    # ......(only input file; calculations; and output file)

if __name__ == "__main__":
    list_sample_common = os.listdir('/lustre/scratch/Stat/s1155136154/ONT_Panel2')# WES,ONT_panel, Pacibo_Panel intersection.
    list_sample_Pacibo_normal = ['RMH12', 'RMH15','RMH20','RMH25','RMH3.','RMH7.','RMH9.']# normal people sample
    list_sample_ONT_cDNA_only = ['RM66T','RM68T','RM77T']
    sample = list_sample_common + list_sample_Pacibo_normal + list_sample_ONT_cDNA_only
    pool=Pool()
    pool.map(main_function,sample)
    pool.close()
    pool.join()

所以当我第一次在集群上使用它时,子进程以 500% 的 CPU 运行(因为我在集群中应用了 5 个核心)。

但是,一段时间后,只有一个核心在运行:

那么,原因是主函数包含输出和输入操作吗?而且由于主进程只将短列表传递给子函数,我相信参数大小不会影响速度。

【问题讨论】:

    标签: python dictionary multiprocessing pool


    【解决方案1】:

    这只是一个有根据的猜测,因为我对sample 的大小以及您的工作函数main_function 正在执行的工作的详细信息知之甚少

    让我们假设您传递给Pool.map 方法的iterable sample 的长度为 70,正如您所说,您的池大小为 5。map 方法将中断将 70 个任务分成chunksize 大小的任务组,将这些块分配给池中的 5 个进程中的每一个。如果您没有为 map 方法指定 chunksize 参数,它会根据迭代的大小 (70) 和池的大小 (5) 计算值,如下所示:

    def compute_chunksize(iterable_size, pool_size):
        chunksize, remainder = divmod(iterable_size, pool_size * 4)
        if remainder:
            chunksize += 1
        return chunksize
    

    所以对于您的值,chunksize 将是 4。因此将有 17 个大小为 4 的任务块和一个较小的第 18 个大小为 2 的块分布在 5 个进程中(每一列是给定任务的队列池中的进程):

    4 4 4 4 4
    4 4 4 4 4
    4 4 4 4 4
    4 4 2
    

    假设所有任务的处理时间相同,您可以看到,经过一定时间后,最后 2 个进程将完成分配给它们的 12 个任务,现在将处于空闲状态,您将仅以 60% 的速度运行.最终,第三个进程将完成其任务,您现在将以 40% 的速度运行。

    但是您可以看到sample 大小和池大小的正确组合,您可能会遇到只运行一个进程的情况。较大的 chunksize 值会加剧这种情况,这旨在减少排队任务所需的共享内存访问次数,但可能会导致 CPU 利用率低下。

    作为一个实验,尝试重新运行您的程序,为您的 map 调用显式指定 1 的 chunksize 参数。除非任务数量是池大小的倍数,并且每个任务都需要相同的时间来完成,否则即使这样,您也不能期望每个处理器都有一个任务要运行。 事实上,除了只有一个进程在运行最终任务之外,其他的情况很少见。但这应该会减少只有一个进程运行的时间百分比。一个处理器正忙。但是使用 1 的 chunksize 被认为对于大型迭代是低效的。

    带有 4 个进程池的演示,其中第一个进程获取所有长时间运行的任务

    这里有 16 个任务以 4 的 chunksize 提交到大小为 4 的池中,因此第一个进程可以运行前 4 个任务,并且人为地使这些任务的运行时间比其他任务长 10 倍。我们返回与子流程关联的标识符,以证明一个特定流程正在处理前 4 个任务:

    from multiprocessing import Pool, current_process
    import re
    import time
    
    def get_id():
        m = re.search(r'SpawnPoolWorker-(\d+)', str(current_process()))
        return int(m[1])
    
    def worker(i):
        R = 10000000
        id = get_id()
        t = time.time()
        # run up the cpu:
        cnt = 0
        for _ in range(R * 10 if i <= 3 else R):
            cnt += 1
        return i, id, time.time() - t
    
    
    
    if __name__ == '__main__':
        p = Pool(4)
        # 4 tasks per process:
        results = p.map(worker, range(16), chunksize=4) # first process gets arguments: 0, 1, 2, 3
        for result in results:
            i, id, elapsed_time = result
            print(f'i={i}, process id={id}, elapsed time={elapsed_time}')
    

    打印:

    i=0, process id=1, elapsed time=6.197998046875
    i=1, process id=1, elapsed time=5.889002323150635
    i=2, process id=1, elapsed time=5.952000856399536
    i=3, process id=1, elapsed time=6.022995948791504
    i=4, process id=2, elapsed time=0.6909992694854736
    i=5, process id=2, elapsed time=0.8339993953704834
    i=6, process id=2, elapsed time=0.5869994163513184
    i=7, process id=2, elapsed time=0.7560005187988281
    i=8, process id=3, elapsed time=0.7500002384185791
    i=9, process id=3, elapsed time=0.7440023422241211
    i=10, process id=3, elapsed time=0.7600002288818359
    i=11, process id=3, elapsed time=0.7479968070983887
    i=12, process id=4, elapsed time=0.7950015068054199
    i=13, process id=4, elapsed time=0.7909986972808838
    i=14, process id=4, elapsed time=0.8639986515045166
    i=15, process id=4, elapsed time=0.7230024337768555
    

    重要提示:我可能说过一些事情是对实际发生的事情的简化。有一个任务输入队列。任务以chunksize 组块的形式放置在此队列中,并且池中的进程在空闲时将下一个chunksize 组从队列中取出以进行处理。我在图表中暗示,这些块在一开始就预先分配给所有进程,但情况并非如此。在我上面的演示中,我选择了一个chunksize,它基本上导致所有块都被处理(如果没有指定,默认chunksize将是1)。但有时,如果任务的处理是微不足道的(例如,只是一个 return None 语句),第一个进程甚至可能获取所有块,而在上面的演示中并非如此。拥有包含所有块的单个队列的含义是,当 chunksize 为 1 时,处理器永远不应处于不必要的空闲状态。

    【讨论】:

    • 感谢您的回答!我会试试看。实际上,样本是一个长度为 56 的列表。我认为,它并不大。所以将块大小设置为 1 会起作用。谢谢!
    • 请注意,即使 chunksize 为 1,其中所有任务都尽可能均匀地分布在所有进程中,其中一个进程可能具有运行时间更长的任务,尽管所有其他进程进程已经完成了所有任务的处理,一个进程可能还有 N 个任务要处理,并且将工作重新分配给其他空闲进程已经太晚了。
    • 另外,还有一个问题:在这种情况下,map_async() 是否比 map() 有更好的性能?
    • 谢谢布布,我明白了。最后的时间将由最重的子任务决定。
    • 我添加了一个演示和一个重要说明,以澄清我所说的过于简单化的内容。
    猜你喜欢
    • 2023-03-07
    • 2017-12-21
    • 2021-04-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多