【问题标题】:celery scheduling task periodically but not executing themcelery 定期调度任务但不执行它们
【发布时间】:2025-02-26 04:30:01
【问题描述】:

我的 django 应用有两个任务,其中一个是周期性任务。

正常任务:AddScore 周期性任务:CalculateTopScore

class CalculateTopScore(celery.Task):

    default_retry_delay = settings.DEFAULT_RETRY_DELAY
    max_retries = settings.DEFAULT_MAX_RETRIES
    name = 'games.tasks.CalculateTopScore'

    def run(self):
        try:
            # Code to run

        except Exception, err:
            logger.exception("Error in running task calculate_top_score")
            self.retry(exc=err)

        return True

    def on_failure(self, exc, task_id):

        failure = "%s task for calculate_top_score failed permanently." % task_id
        logger.error(failure)

    def on_success(self):

        task_info = 'calculate_top_score task successfully'
        logger.info(task_info)

我想每 30 分钟定期执行一次这个任务。

这是我正在使用的设置:

#celery/settings.py

import djcelery
import kombu

from celery.schedules import crontab
from config.celery import exchanges


djcelery.setup_loader()

CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

CELERYBEAT_SCHEDULE = {
    "calcluate_score": {
        "task": "games.tasks.CalculateTopScore",
        "schedule": crontab(minute='*/30'),
        "args": (),
    },
}

CELERY_QUEUES = (
    kombu.Queue('add_score',
                exchange=exchanges.add_score_exchange,
                routing_key='add.scores'),
    kombu.Queue('calcluate_score',
                 exchange=exchanges.calcluate_score_exchange,
                 routing_key='calculate.scores'),
)

CELERY_ROUTES = ('config.celery.routers.CeleryTaskRouter',)

# Default delay(in seconds) for retrying tasks.
DEFAULT_RETRY_DELAY = 60

# Maximum retry count
DEFAULT_MAX_RETRIES = 6

CELERY_IGNORE_RESULT = True

exchanges.py 文件

#exchanges.py

from kombu import Exchange

add_score_exchange = Exchange('add_score', type='direct')
calcluate_score_exchange = Exchange('calcluate_score', type='direct')

routes.py 文件

ROUTES = {
    'players.tasks.AddScore': {
        'exchange': 'add_score',
        'exchange_type': 'direct',
        'routing_key': 'add.score',
    },
   'games.tasks.CalculateTopScore': {
        'exchange': 'calculate_score',
        'exchange_type': 'direct',
        'routing_key': 'calculate.score',
    },
}


class CeleryTaskRouter(object):
    """ This is a basic celery task router.
    """

    def route_for_task(self, task, arg=None, kwargs=None):
        return ROUTES.get(task)

在我们的生产服务器上,我使用以下参数运行 celerydceleryd worker -B

现在在日志中我观察到 celerybeat 每 30 分钟安排一次任务,但工作人员根本不知道安排的任务,因此它没有执行。

为什么?缺少任何配置/设置吗?如何定期执行基于类的任务?

请帮忙

【问题讨论】:

  • 关于这件事的任何更新?我也有一个没有执行的周期性任务,所有其他类型的任务都可以。

标签: python django scheduled-tasks celery django-celery


【解决方案1】:

您可能必须指定您希望 celery 从中获取任务的队列。默认是使用celery 队列。

试试:

celeryd worker -B -Q add_score,calculate_score

【讨论】:

  • 但是add_Score任务是由视图内部的一些函数触发的。所以正确执行。我还需要指定add_score 任务吗?还是应该在队列中指定我要定期运行的calculate_score