【问题标题】:Celery task that returns group in chain返回链中组的芹菜任务
【发布时间】:2015-01-21 22:15:49
【问题描述】:

我正在尝试将 celery 任务设置为 Django 站点的一部分,该站点由运行处理管道的多个步骤的 celerybeat 调用。

第一步下载数据文件,第二步在这些文件中绘制数据,第三步将这些图切割成谷歌地图的瓦片。

第一步只是一个任务,但第二步和第三步是一组任务,因为每个数据文件都有几十个图。

我想做的是让 celerybeat 调用类似的任务

@shared_task
def pipeline():
    job = chain(download.s(), get_plot_task_group().s(), spacer.s(), tile.s())
    result = job.apply_async()
    return result

其中 celery 需要间隔器以等待绘图组在平铺之前完成组中的所有任务,如下所述:celery - chaining groups and subtasks. -> out of order execution

不过,问题在于 get_plot_task_group() 需要有关数据文件的信息来构建任务组,但 celery 会立即执行它,而不是在 download.s() 完成后执行。有没有办法告诉 celery 等待调用该函数?

我也尝试让 get_plot_task_group 本身成为一个返回组的任务,但我无法在链中或链完成后调用该组。我必须调用 result.get() 来获取组对象,并且在任务中调用 result.get() 是不好的做法。我也看不到使用回调的方法。

如果您有任何见解,我将不胜感激。

tl;dr:我必须在另一个任务中创建一组任务,然后运行它,但我不确定如何。

【问题讨论】:

  • 您是否在工作流程中的任务之间传递结果?

标签: python task celery chain


【解决方案1】:

将您的小组任务转换为和弦来实现这一目标。这是一个简单的例子。

@app.task
def dummy_task():
    return 'I am a dummy task'

@app.task
def add(x, y):
    return x + y

@app.task
def task_with_group_task():
    task1 = add.si(1, 2)
    group_task = group(add.si(i, i) for i in range(5))
    task2 = chord(group_task, dummy_task.si())
    task3 = add.si(9, 9)

    pipeline = chain(task1, task2, task3)()

这里 task2 仅在 task1 完成后启动,task3 仅在 task2 完成后启动。

注意:我不会将一项任务的结果传递给另一项任务。

【讨论】:

  • 你能以某种方式让task_with_group_task 在链完成后完成吗?
  • @shapiromatron 最好不要在一个任务中等待完成另一个任务。您可以创建task4 来完成剩余的工作,您可以将其添加到链中。
猜你喜欢
  • 2014-12-04
  • 2020-08-07
  • 2012-09-28
  • 2013-02-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多