【问题标题】:Saving a celery task (for re-running) in database在数据库中保存芹菜任务(用于重新运行)
【发布时间】:2020-01-08 20:26:41
【问题描述】:

我们的工作流程目前是围绕旧版本的 celery 构建的,因此请记住,事情已经不是最佳的了。我们需要运行一个任务并将该任务运行的记录保存在数据库中。如果该任务失败或挂起(它经常发生),我们希望重新运行,就像它第一次运行一样。但这不应该自动发生。它需要根据故障的性质手动触发,并且需要将结果记录在数据库中以做出该决定(通过前端)。

我们如何在数据库中保存完整的任务记录,以便后续进程可以获取记录并运行新的相同任务?当前实现将@task 修饰函数的路径作为TaskInfo 模型的一部分保存在数据库中。当需要重新运行任务时,我们在TaskInfo 模型上有一个get_task() 方法,它从数据库中获取路径,使用getattr 导入它,另一个rerun() 方法使用@987654327 再次运行任务@(也保存在数据库中)。

像这样(这些是TaskInfo 模型实例上的方法):

def get_task(self):
    """Returns the task's decorated function, which can be delayed."""
    module_name, object_name = self.path.rsplit('.', 1)
    module = import_module(module_name)
    task = getattr(module, object_name)
    if inspect.isclass(task):
        task = task()
    # task = current_app.tasks[self.path]
    return task

 def rerun(self):
    """Re-run the task, and replace this one.

    - A new task is scheduled to run.
    - The new task's TaskInfo has the same parent as this TaskInfo.
    - This TaskInfo is deleted.
    """
    args, kwargs = self.get_arguments()
    celery_task = self.get_task()
    celery_task.delay(*args, **kwargs)
    defaults = {
        'path': self.path,
        'status': Status.PENDING,
        'timestamp': timezone.now(),
        'args': args,
        'kwargs': kwargs,
        'parent': self.parent,
    }
    TaskInfo.objects.update_or_create(task_id=celery_task.id, defaults=defaults)
    self.delete()

必须有一个更简洁的解决方案来将任务保存在数据库中以便稍后重新运行,对吧?

【问题讨论】:

  • 您的结果后端设置为什么?如果您的结果后端是数据库,那么结果,无论是否失败,都将写入数据库
  • 嘿...celery.backends.base.DisabledBackend,所以我猜是默认的,没有结果后端。我的猜测是最初的维护者想要编写一些自定义代码来重新运行任务和/或不知道如何在后端存储结果。结果后端是否包含足够的信息以在任务完成后重新运行任务?
  • 默认情况下,result_backend 只记录任务结果、回溯、状态等,因此缺少 args 和 kwargs,因此没有足够的信息来重新运行任务。

标签: django celery celery-task


【解决方案1】:

最新版本的 Celery (4.4.0) 包含一个参数 extended_result。可以设置为True,那么Result Backend Database中的表(默认为celery_taskmeta)将存储任务的args and kwargs

这是一个演示:

app = Celery('test_result_backend')

app.conf.update(
    broker_url='redis://localhost:6379/10',
    result_backend='db+mysql://root:passwd@localhost/celery_toys',
    result_extended=True
)


@app.task(bind=True, name='add')
def add(self, x, y): 
    self.request.task_name = 'add'  # For saving the task name.
    time.sleep(5)
    return x + y 

使用 MySQL 中记录的任务信息,您可以轻松地重新运行您的任务。

【讨论】:

  • 您不需要任何扩展结果。无论如何都会存储 Args 和 kwargs。
  • 是的,我明白了,我的意思是这样,它可以避免编写任何将任务元存储到数据库的逻辑。
猜你喜欢
  • 1970-01-01
  • 2019-02-26
  • 2020-08-07
  • 1970-01-01
  • 1970-01-01
  • 2018-01-08
  • 2021-11-11
  • 2012-08-30
  • 1970-01-01
相关资源
最近更新 更多