【发布时间】:2013-10-22 04:37:08
【问题描述】:
我正在尝试使用 ipython 的并行处理来并行处理数据。我正在按照@minrk 的指示回答how to get intermidiate results in ipython parallel processing? 上的问题。由于数据是异构的,因此某些处理任务比其他任务完成得更快,我想在它们可用时立即保存它们。我按以下方式执行此操作:
from IPython.parallel import Client
def specialfunc(param):
import time
if param > 8:
raise IOError
else:
time.sleep( param)
return param
client = Client()
balanced = client.load_balanced_view()
balanced.block = False
param_list = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)
然后我可以遍历 asyncmap 并在结果准备好时变为可用:
for i in asyncmap:
print i
问题是我的代码有时会抛出我想处理的异常(上面的示例在调用参数超过 8 时强制出现 IOError)。但是,一旦其中一个引擎出现摇晃,整个异步映射“似乎”就完成了。
我实际上注意到,当我询问 asyncmap.metadata 时,可以很好地找出哪个消息给出了错误 (asyncmap.metadata[i]['pyerr']),但是我不知道如何等待结果像他们一样进来。
所以我的问题是我应该如何处理从我的引擎异步到达的结果,即使它们有时会抛出异常。如何在不打乱控制器中等待结果的情况下捕获引擎中的异常?
【问题讨论】:
标签: python asynchronous parallel-processing ipython