【问题标题】:Celery 4.1 periodic tasks errorCelery 4.1 周期性任务错误
【发布时间】:2018-08-03 13:02:04
【问题描述】:

我正在尝试将任务设置为每十秒运行一次。使用 Celery Beat。

我正在使用:

 Django==1.11.3
 celery==4.1.0
 django-celery-beat==1.1.1
 django-celery-results==1.0.1

它给了我以下错误:

收到“operations.tasks.message”类型的未注册任务

我是 Celery 的新手,我尝试了很多解决方案,但似乎找不到解决方案,不胜感激

settings.py

CELERY_BROKER_URL = 'pyamqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Johannesburg'
CELERY_BEAT_SCHEDULE = {
        'message': {
        'task': 'operations.tasks.message',
        'schedule': 10.0
    }
    }

celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nodiso.settings')

app = Celery('nodiso')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

task.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
from operations import models
from .celery import periodic_task



@task
def message():
    t = models.Celerytest.objects.create(Message='Hello World')
    t.save()

文件结构

proj-
     proj-
         __init__.py
         settings.py-
         celery.py-
     app-
         tasks.py-

【问题讨论】:

  • IIRC Celery taks 名称(默认情况下)是方法名称,因此您的任务可能仅使用名称“message”注册,而不是配置中的“operations.tasks.message”。
  • 您的文件名为task.py(至少在您的问题描述中),但您提供的是此路径而不是operations.tasks.message,请注意S。另外,您正在使用装饰器@task,但它没有定义(导入)

标签: python django celery celerybeat


【解决方案1】:

在我的 celery.py 文件中,我这样定义 app

app = Celery(
    'your_celery_app_name',
    include=[
        'your_celery_app_name.module.task1',
        'your_celery_app_name.module.task2',
    ]
)
app.config_from_object('your_celery_app_name.celeryconfig')

我的celeryconfig.py 是我定义我的节拍和其他设置的地方(我认为这与您的 settings.py 相同)。

以下内容可能不相关——我不是 Python 方面的专家,也不是如何组合包的专家——但根据我有限的理解,您的任务应该是 celery 应用程序模块的子模块。不过要加少许盐。

我的项目结构看起来更像这样:

your_celery_app_name (dir)
    setup.py (file)
    your_celery_app_name (dir)
        __init__.py (file)
        celery.py (file)
        celeryconfig.py (file)
        module (dir)
            __init__.py (importing task1 and task2 from tasks)
            tasks.py (implementing task1 and task2)

【讨论】:

    最近更新 更多