【发布时间】:2021-11-05 21:00:53
【问题描述】:
我有“储物柜”装饰器:
def lock_task(func):
def wrapper(*args, **kwargs):
if redis.set(func.__name__, 'lock', nx=True):
try:
result = func(*args, **kwargs)
finally:
redis.delete(func.__name__)
return result or True
else:
return 'Skipped'
return wrapper
我的装饰师也有 celery-task:
@celery_app.task
@lock_task
def test():
call_command('test')
还有我的 celery-beat 设置:
celery_app.conf.beat_schedule = {
'test': {
'task': 'project.celery.test',
'schedule': crontab(minute='*/1')
}
}
启动后,我收到 KeyError Received unregistered task of type 'project.celery.test'。
如何正确地调用这个构造?
【问题讨论】:
-
我添加了一个刚刚更新的答案。你能检查一下
functools.wraps()的使用是否有效吗?
标签: python django redis celery