【问题标题】:How to run celery-beat task under decorator?如何在装饰器下运行 celery-beat 任务?
【发布时间】:2021-11-05 21:00:53
【问题描述】:

我有“储物柜”装饰器:

def lock_task(func):
    def wrapper(*args, **kwargs):
        if redis.set(func.__name__, 'lock', nx=True):
            try:
                result = func(*args, **kwargs)
            finally:
                redis.delete(func.__name__)
            return result or True
        else:
            return 'Skipped'
    return wrapper

我的装饰师也有 celery-task:

@celery_app.task
@lock_task
def test():
    call_command('test')

还有我的 celery-beat 设置:

celery_app.conf.beat_schedule = {
    'test': {
        'task': 'project.celery.test',
        'schedule': crontab(minute='*/1')
    }
}

启动后,我收到 KeyError Received unregistered task of type 'project.celery.test'。

如何正确地调用这个构造?

【问题讨论】:

  • 我添加了一个刚刚更新的答案。你能检查一下functools.wraps() 的使用是否有效吗?

标签: python django redis celery


【解决方案1】:

似乎 wrapper 函数将是注册到 celery 的函数,而不是实际的 test 函数。如果你在启动 celery worker 的时候看到这个日志就可以验证一下:

$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
  . project.celery.wrapper

要注册实际任务的名称,请使用functools.wraps(),如文档所述:

如果不使用这个装饰器工厂,示例函数的名称将是 'wrapper'

from functools import wraps

def lock_task(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
    ...

如果错误仍然存​​在,请确保您已正确配置:

  • 芹菜imports 例如celery_app.conf.update(imports=['project.celery'])celery_app.conf.imports = ['project.celery']
  • 或 Celery include (example) 例如celery_app = Celery(..., include=['project.celery'])

为了验证,您应该在启动 celery worker 时看到名为 project.celery.test 的任务(要强调的是,worker,而不是 scheduler):

$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
  . project.celery.test
  • 见最后一行。如果您使用标志--loglevel=INFO 调用worker,它应该是可见的。如果您没有在其中看到任务 test 或看到 wrapper,那么上述步骤可能会有所帮助。

【讨论】:

    【解决方案2】:

    问题已通过将名称添加到装饰器中来解决

    @celery_app.task(name='TASKNAME')
    @lock_task
    def test():
        call_command('test')
    

    然后将此名称设置为调度程序:

    celery_app.conf.beat_schedule = {
        'test': {
            'task': 'TASKNAME',
            'schedule': crontab(minute='*/1')
        }
    }
    

    它对我有用。

    【讨论】:

      最近更新 更多