【问题标题】:Why does the get() operation in multiprocessing.Pool.map_async take so long?为什么 multiprocessing.Pool.map_async 中的 get() 操作需要这么长时间?
【发布时间】:2017-12-09 13:04:09
【问题描述】:
import multiprocessing as mp
import numpy as np

pool   = mp.Pool( processes = 4 )
inp    = np.linspace( 0.01, 1.99, 100 )
result = pool.map_async( func, inp ) #Line1 ( func is some Python function which acts on input )
output = result.get()                #Line2

所以,我试图在 Python 中并行化一些代码,在 multiprocessing.Pool() 实例上使用 .map_async() 方法。

我注意到
Line1 大约需要千分之一秒,
Line2 大约需要 0.3 秒。

有没有更好的方法来做到这一点或绕过Line2引起的瓶颈,

我在这里做错了吗?

(我对此比较陌生。)

【问题讨论】:

  • ap_async() 刚刚开始处理。另一方面,get() 必须等到所有进程都完成并得到结果。您还期望发生什么?
  • 如果您的目标是在结果可用时获得结果,而不是等待所有任务完成,您通常会迭代 imap 的结果(或者如果您不关心订购,imap_unordered 以获得额外的速度)。

标签: python parallel-processing multiprocessing parallelism-amdahl


【解决方案1】:

我在这里做错了吗?

不要惊慌,许多用户都这样做 - 付出的比收到的多。

这是一个普通的讲座,不是关于使用一些“promising”语法构造器,而是关于支付使用它的实际成本。

故事很长,效果直截了当 - 您期望一个容易实现的成果,但必须付出巨大的流程实例化、工作包重新分配和收集结果的成本,所有这些都是马戏团所做的但是几轮func()-calls。


哇?
停止!
并行化被带到我面前,这将加快处理速度?!?

Well, who told you that any such ( potential ) speedup is for free?

让我们量化,而不是测量实际的代码执行时间,而不是情绪,对吧?

基准测试总是公平的。
它可以帮助我们凡人摆脱期望
并让自己进入定量的证据记录支持的知识

from zmq import Stopwatch; aClk = Stopwatch() # this is a handy tool to do so

原样测试:

在继续之前,应该记录这对:

>>> aClk.start(); _ = [   func( SEQi ) for SEQi in inp ]; aClk.stop() # [SEQ] 
>>> HowMuchWillWePAY2RUN( func, 4, 100 )                              # [RUN]
>>> HowMuchWillWePAY2MAP( func, 4, 100 )                              # [MAP]

如果希望使用任何其他工具扩展实验,这将设置从纯[SERIAL] [SEQ] 调用到未优化joblib.Parallel() 或任何其他性能包络的跨度,如所说的multiprocessing.Pool() 或其他。


测试用例 A:

意图:
以衡量一个{过程|的成本作业}-实例化,我们需要一个 NOP-work-package 有效负载,它几乎不会在“那里”花费任何东西,但会返回“回来”,并且不需要支付任何额外的附加成本(无论是用于任何输入参数的传输或返回任何值)

def a_NOP_FUN( aNeverConsumedPAR ):
    """                                                 __doc__
    The intent of this FUN() is indeed to do nothing at all,
                             so as to be able to benchmark
                             all the process-instantiation
                             add-on overhead costs.
    """
    pass

因此,设置开销附加成本比较在这里:

#-------------------------------------------------------<function a_NOP_FUN
[SEQ]-pure-[SERIAL] worked within ~   37 ..     44 [us] on this localhost
[MAP]-just-[CONCURENT] tool         2536 ..   7343 [us]
[RUN]-just-[CONCURENT] tool       111162 .. 112609 [us]

joblib.Parallel() 任务处理上使用
joblib.delayed() 的策略:

def HowMuchWillWePAY2RUN( aFun2TEST = a_NOP_FUN, JOBS_TO_SPAWN = 4, RUNS_TO_RUN = 10 ):
    from zmq import Stopwatch; aClk = Stopwatch()
    try:
         aClk.start()
         joblib.Parallel(  n_jobs = JOBS_TO_SPAWN
                          )( joblib.delayed( aFun2TEST )
                                           ( aFunPARAM )
                                       for ( aFunPARAM )
                                       in  range( RUNS_TO_RUN )
                             )
    except:
         pass
    finally:
         try:
             _ = aClk.stop()
         except:
             _ = -1
             pass
    pass;  pMASK = "CLK:: {0:_>24d} [us] @{1: >4d}-JOBs ran{2: >6d} RUNS {3:}"
    print( pMASK.format( _,
                         JOBS_TO_SPAWN,
                         RUNS_TO_RUN,
                         " ".join( repr( aFun2TEST ).split( " ")[:2] )
                         )
            )

multiprocessing.Pool() 实例上使用轻量级策略
.map_async() 方法:

def HowMuchWillWePAY2MAP( aFun2TEST = a_NOP_FUN, PROCESSES_TO_SPAWN = 4, RUNS_TO_RUN = 1 ):
    from zmq import Stopwatch; aClk = Stopwatch()
    try:
         import numpy           as np
         import multiprocessing as mp

         pool = mp.Pool( processes = PROCESSES_TO_SPAWN )
         inp  = np.linspace( 0.01, 1.99, 100 )

         aClk.start()
         for i in xrange( RUNS_TO_RUN ):
             pass;    result = pool.map_async( aFun2TEST, inp )
             output = result.get()
         pass
    except:
         pass
    finally:
         try:
             _ = aClk.stop()
         except:
             _ = -1
             pass
    pass;  pMASK = "CLK:: {0:_>24d} [us] @{1: >4d}-PROCs ran{2: >6d} RUNS {3:}"
    print( pMASK.format( _,
                         PROCESSES_TO_SPAWN,
                         RUNS_TO_RUN,
                         " ".join( repr( aFun2TEST ).split( " ")[:2] )
                         )
            )

所以,
第一组痛苦和惊喜
直接计算 joblib.Parallel() 并发池中的实际 cost-of-doing-NOTHING

 CLK:: __________________117463 [us] @   4-JOBs ran    10 RUNS <function a_NOP_FUN
 CLK:: __________________111182 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________110229 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________110095 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________111794 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________110030 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________110697 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: _________________4605843 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________336208 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________298816 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________355492 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________320837 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________308365 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________372762 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________304228 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________337537 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________941775 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
 CLK:: __________________987440 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
 CLK:: _________________1080024 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
 CLK:: _________________1108432 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
 CLK:: _________________7525874 [us] @ 123-JOBs ran100000 RUNS <function a_NOP_FUN

因此,这个科学公平和严格的测试从这个最简单的案例开始,已经显示了所有相关代码执行处理设置开销的基准成本有史以来最小 joblib.Parallel() 正弦罚分

这将我们引向了一个方向,现实世界的算法确实存在 - 最好接下来在测试循环中添加一些越来越大的“有效负载”大小。


现在,我们知道了
进入"just"-[CONCURRENT] 代码执行的惩罚 - 以及下一步?

使用这种系统化和轻量级的方法,我们可能会继续前进,因为我们还需要对 { remote-job-PAR-XFER(s) | remote-job-MEM.alloc(s) | remote-job-CPU-bound-processing | remote-job-fileIO(s) } 的附加成本和其他阿姆达尔定律间接影响进行基准测试

这样的函数模板可能有助于重新测试(如您所见,需要重新运行很多东西,而 O/S 噪音和一些额外的工件将进入实际的使用成本模式) :


测试用例 B:

一旦我们支付了前期成本,下一个最常见的错误就是忘记内存分配的成本。所以,让我们测试一下:

def a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR( aNeverConsumedPAR, SIZE1D = 1000 ):
    """                                                 __doc__
    The intent of this FUN() is to do nothing but
                             a MEM-allocation
                             so as to be able to benchmark
                             all the process-instantiation
                             add-on overhead costs.
    """
    import numpy as np              # yes, deferred import, libs do defer imports
    aMemALLOC = np.zeros( ( SIZE1D, #       so as to set
                            SIZE1D, #       realistic ceilings
                            SIZE1D, #       as how big the "Big Data"
                            SIZE1D  #       may indeed grow into
                            ),
                          dtype = np.float64,
                          order = 'F'
                          )         # .ALLOC + .SET
    aMemALLOC[2,3,4,5] = 8.7654321  # .SET
    aMemALLOC[3,3,4,5] = 1.2345678  # .SET

    return aMemALLOC[2:3,3,4,5]

如果您的平台将停止分配请求的内存块,那么我们会遇到另一种问题(如果尝试在物理资源中并行运行,则会出现一类隐藏的玻璃天花板不可知论的方式)。可以编辑SIZE1D 缩放,以至少适应平台 RAM 寻址/大小调整功能,但是,现实世界问题计算的性能包络仍然是我们非常感兴趣的:

>>> HowMuchWillWePAY2RUN( a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR, 200, 1000 )

可能会产生
支付成本,介于 0.1 [s]+9 [s] 之间(!!)
只是仍然什么都不做,但现在也不要忘记一些现实的 MEM 分配附加成本“那里

CLK:: __________________116310 [us] @   4-JOBs ran    10 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________120054 [us] @   4-JOBs ran    10 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________129441 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________123721 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________127126 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________124028 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________305234 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________243386 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________241410 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________267275 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________244207 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________653879 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________405149 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________351182 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________362030 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: _________________9325428 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________680429 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________533559 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: _________________1125190 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________591109 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR

测试用例 C:

read the tail sections of this post

测试用例 D:

read the tail sections of this post


结语:

对于每一个“承诺”,最好的下一步是首先交叉验证实际的代码执行成本,然后再开始任何代码重新设计。实际平台的附加成本总和可能会破坏任何预期的加速效果,即使最初的、开销天真的阿姆达尔定律可能已经产生了一些预期的加速效果。

正如 Walter E. Deming 先生多次表示的那样,没有数据,我们只能听天由命


奖励部分:
读到这里,可能已经发现,#Line2 本身没有任何“缺点”或“错误”,但仔细的设计实践将展示任何更好的语法构造函数,花费更少实现更多(代码执行平台上的实际资源(CPU、MEM、IO、O/S)允许)。其他任何事情都与只是盲目地告诉财富没有本质上的不同。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-08-31
    • 1970-01-01
    • 1970-01-01
    • 2011-08-27
    • 2011-12-07
    相关资源
    最近更新 更多