【发布时间】:2015-01-21 22:15:49
【问题描述】:
我正在尝试将 celery 任务设置为 Django 站点的一部分,该站点由运行处理管道的多个步骤的 celerybeat 调用。
第一步下载数据文件,第二步在这些文件中绘制数据,第三步将这些图切割成谷歌地图的瓦片。
第一步只是一个任务,但第二步和第三步是一组任务,因为每个数据文件都有几十个图。
我想做的是让 celerybeat 调用类似的任务
@shared_task
def pipeline():
job = chain(download.s(), get_plot_task_group().s(), spacer.s(), tile.s())
result = job.apply_async()
return result
其中 celery 需要间隔器以等待绘图组在平铺之前完成组中的所有任务,如下所述:celery - chaining groups and subtasks. -> out of order execution
不过,问题在于 get_plot_task_group() 需要有关数据文件的信息来构建任务组,但 celery 会立即执行它,而不是在 download.s() 完成后执行。有没有办法告诉 celery 等待调用该函数?
我也尝试让 get_plot_task_group 本身成为一个返回组的任务,但我无法在链中或链完成后调用该组。我必须调用 result.get() 来获取组对象,并且在任务中调用 result.get() 是不好的做法。我也看不到使用回调的方法。
如果您有任何见解,我将不胜感激。
tl;dr:我必须在另一个任务中创建一组任务,然后运行它,但我不确定如何。
【问题讨论】:
-
您是否在工作流程中的任务之间传递结果?