【问题标题】:Getting list of all tasks from celery backend从 celery 后端获取所有任务的列表
【发布时间】:2017-03-16 13:56:22
【问题描述】:

我有一个 celery 应用程序,它使用后端来存储已完成任务的结果。

当任务排队/运行时,我可以获取有关它们的信息,但在它们完成后,如何从结果后端获取所有任务 ID 的列表?

我想在 python 应用程序中执行此操作,并且以不依赖于特定后端的方式执行此操作(例如,我将来可能希望在文件系统和 MySQL 之间切换作为结果存储)。

【问题讨论】:

  • 您只需要success 任务还是全部?你能解释一下它是什么吗?为什么你不能使用celery flower
  • @DanilaGanchar 理想的所有任务。我有一些生成报告的任务。在每周结束时,我想对一周内所有生成的报告运行一个整体报告(即获取成功/失败以及一周内运行的所有任务的结果)。我目前正在使用文件系统后端来获取结果,所以真的不想运行整个额外的 webapp(芹菜花)来列出/解析目录的内容。这是我过去在其他任务队列系统中做过的事情,所以想检查一下我是否错过了在 Celery 中执行此操作的方法。

标签: python celery


【解决方案1】:

我认为最好的解决方案是将有关已完成任务的信息存储在数据库中。首先,它将易于处理。 这里只是一个SQLite 的例子。我们的任务表:

# You can add specific columns for args, kwargs etc.
# it is just an example 
CREATE TABLE celery_tasks (
    "id" INTEGER PRIMARY KEY,
    "task_id" TEXT NOT NULL,
    "task_name" TEXT NOT NULL,
    "state" TEXT NOT NULL,
    "created" TEXT NOT NULL
)

我们的芹菜应用tasks.py:

import celery
from celery.signals import task_postrun
from celery.task import Task
import sqlite3
from datetime import datetime


@task_postrun.connect()
def task_postrun(signal=None, sender=None, task_id=None, task=None,
                 args=None, kwargs=None, retval=None, state=None):
    # For example we don't want to store info about specific tasks 
    ignored_tasks = ('tasks.ignore_task', )

    if task.name not in ignored_tasks:
        # write info about a finished task into SQLite
        conn = sqlite3.connect('db')
        conn.execute(
            'INSERT INTO celery_tasks (task_id, task_name, state, created) VALUES (?,?,?,?)',
            (task_id, task.name, state, datetime.now())
        )

        conn.commit()
        conn.close()


app = celery.Celery(
   'tasks',
   broker='redis://localhost:6379/0',
   backend='redis://localhost:6379/0',
)


@app.task
def success_task():
    pass


@app.task
def failure_task():
    raise Exception('something wrong')


@app.task
def ignore_task():
    """
    Example of the task that we want to ignore for SQLite.
    """
    pass

run_tasks.py:

from tasks import success_task, failure_task, ignore_task

success_task.delay()
failure_task.delay()
ignore_task.delay()

因此,在此之后,您可以使用常规 SQL 查询在代码的任何位置获取有关已完成任务的任何信息。(SELECT * FROM celery_tasks WHERE created ... AND ...)

你也可以不时清理桌子。 我认为使用 db 是一个很好的解决方案。

另一种方式。

您可以配置CELERY_RESULT_BACKEND 设置。在这种情况下,celery 将创建 celery_tasksetmetacelery_taskmeta 表。任务数据将自动实现:

app = Celery(
    'app_name',
    broker='CELERY_BROKER_URL...',
    backend='db+mysql://credentials...',
)

希望这会有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-21
    • 1970-01-01
    • 1970-01-01
    • 2013-09-17
    • 2012-09-21
    相关资源
    最近更新 更多