【发布时间】:2016-07-28 06:21:48
【问题描述】:
我的任务链包含三个任务 fetch_page、check_source 和 store page
def update_page_info(**headers):
chain=fetch_page.s(headers['key']) | check_source.s(headers['key_1']) | store_info.s()
chain().apply_async()
fetch_page 获取页面并收集它需要收集的内容:
@app.task(bind=True)
def fetch_page(self,url):
#fetch_page here and return a tuple so that it can be unpacked
# dosomething
现在获取页面后,它会在下一个任务 check_source 中检查源。
@app.task(bind=True)
def check_source(self,page_and_url,handle):
try:
#unpack your stuffs here
page,url=page_and_url
get_result={}
if handle=='first_option':
get_result=select_first_option(one,two)
return get_result
elif handle=='second_option':
get_result=select_second_option(one,two)
return (get_result)
elif handle=='third_option':
get_result=select_third_option(one,two)
return (get_result)
else:
return "IGNORE FOR NOW"
except Exception as exc:
pass
所以困惑是我可以从这里调用一些其他任务吗?会不会有任何不一致,或者工人会不会在这样做时陷入僵局?
最后它应该执行 store_info() 来存储从 check_source() 返回的东西
@app.task(bind=True)
def store_info(self,result):
print ("store_info ")
try:
#store the fetched pages
except Exception as exc:
#dosomething
finally:
pass
我正在遵循这种方法,只需要稍加修改http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks。
任何人都可以建议我应该如何做以及我需要更加小心的事情吗?
【问题讨论】:
标签: python python-2.7 celery celery-task