【问题标题】:Processing results from asyncmap as they come in处理来自 asyncmap 的结果
【发布时间】:2013-10-22 04:37:08
【问题描述】:

我正在尝试使用 ipython 的并行处理来并行处理数据。我正在按照@minrk 的指示回答how to get intermidiate results in ipython parallel processing? 上的问题。由于数据是异构的,因此某些处理任务比其他任务完成得更快,我想在它们可用时立即保存它们。我按以下方式执行此操作:

from IPython.parallel import Client

def specialfunc(param):
    import time
    if param > 8:
        raise IOError
    else:
        time.sleep( param)
        return param

client = Client()
balanced       = client.load_balanced_view()
balanced.block = False
param_list = range(10)   # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)

然后我可以遍历 asyncmap 并在结果准备好时变为可用:

for i in asyncmap:
    print i

问题是我的代码有时会抛出我想处理的异常(上面的示例在调用参数超过 8 时强制出现 IOError)。但是,一旦其中一个引擎出现摇晃,整个异步映射“似乎”就完成了。

我实际上注意到,当我询问 asyncmap.metadata 时,可以很好地找出哪个消息给出了错误 (asyncmap.metadata[i]['pyerr']),但是我不知道如何等待结果像他们一样进来。

所以我的问题是我应该如何处理从我的引擎异步到达的结果,即使它们有时会抛出异常。如何在不打乱控制器中等待结果的情况下捕获引擎中的异常?

【问题讨论】:

    标签: python asynchronous parallel-processing ipython


    【解决方案1】:

    我知道这听起来有点愚蠢,但您可以返回一个特殊值来指示错误,例如 -1None 或字符串。为了绕过map_async,我所做的是循环参数并使用apply_async,将结果存储在一个列表中。然后,我遍历列表,尝试获取结果并一次处理一个。看起来像这样:

     n_cores = len(c.ids)
     for n,p in enumerate( params ):
         core = c.ids[n%n_cores]
         calls.append( c[core].apply_async( f, p ) )
    
      #then you get the results
    
     while calls != []:
          for c in calls:
              try:
                   result = c.get(1e-3)
                   process(result)
                   calls.remove( c )
                   #in the case your call failed, you can apply_async again.
                   # and append the call to calls.
              except parallel.TimeoutError:
                   pass
    

    或者使用c[core].apply() 并使用c.ready() 检查调用。没有异常处理基本上是一样的。烦人的是这会占用大量内存,因为每个呼叫的results 和其他dict 都很难清除。

    我正在做类似的事情here,我决定 map_async 对我不起作用。 This 也可能是相关的,以防您决定采用这种方法。

    干杯。

    PS:我认为基本上这是您在上面实现的,但我发现单独处理调用然后将它们堆叠到地图中更自然,特别是如果您以后可能想重新处理其中一些调用。

    【讨论】:

      【解决方案2】:

      ipython/*/examples/parallel/customresults.py的启发,我想出了这个解决方案:

      asyncmap = balanced.map(specialfunc, param_list, ordered=False)
      
      #create original mapping of msg_ids to parameters
      # maybe just a quick way to find which parameter gave what result
      msg_ids_to_parameters = dict(zip(asyncmap.msg_ids, param_list))
      
      pending = set(asyncmap.msg_ids) # all queued jobs are pending
      while pending:   # we'll come back as long as finished jobs haven't been looked at yet
          try:
              client.wait(pending, 1e-3)
          except parallel.TimeoutError:
              # ignore timeouterrors, since they only mean that at least one isn't done
              pass
      
          # finished is the set of msg_ids that are complete
          finished = pending.difference(client.outstanding)
          # update pending to exclude those that just finished
          pending = pending.difference(finished)
          for msg_id in finished:
              # we know these are done, so don't worry about blocking
              ar = client.get_result(msg_id)
              # checking whether any exceptions occurred when code ran on the engine
              if ar.metadata['pyerr'] is None:
                  print "job id %s finished on engine %i " % (msg_id, ar.engine_id)
                  print "and results for parameter %i :" % msg_ids_to_parameters[msg_id]
                  # note that each job in a map always returns a list of length chunksize
                  # even if chunksize == 1
                  for res in ar.result:
                      print " item %i \n" % res
              else:
                  print('this went wrong for %i (%s)' % (msg_ids_to_parameters[msg_id], ar.metadata['pyerr']))
      

      基本上,示例代码的变化是查看元数据并查看是否记录了错误,只有在没有记录的情况下才继续通过ar.result 检索结果。

      【讨论】:

      • 顺便说一句:load_balanced_viewmap 方法是否采用 ordered 关键字。我以为map 是按定义排序的。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-04-17
      • 2010-12-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多