【发布时间】:2025-09-17 03:05:02
【问题描述】:
我需要一个最低限度的示例来执行周期性任务(每 5 分钟运行一些函数,或者在 12:00:00 等运行一些函数)。
在我的myapp/tasks.py,我有,
from celery.task.schedules import crontab
from celery.decorators import periodic_task
from celery import task
@periodic_task(run_every=(crontab(hour="*", minute=1)), name="run_every_1_minutes", ignore_result=True)
def return_5():
return 5
@task
def test():
return "test"
当我运行 celery workers 时,它会显示任务(如下所示),但不返回任何值(在终端或花中)。
[tasks]
. mathematica.core.tasks.test
. run_every_1_minutes
请提供最少的示例或提示以达到预期结果。
背景:
我有一个config/celery.py,其中包含以下内容:
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")
app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
在我的config/__init__.py,我有
from .celery import app as celery_app
__all__ = ['celery_app']
我在myapp/tasks.py中添加了一个类似下面的函数
from celery import task
@task
def test():
return "test"
当我从shell运行test.delay()时,它运行成功并且在flower中也显示了任务信息
【问题讨论】:
-
如何运行worker?您是否使用 -B 选项:
celery -A proj worker -B? -
这样,
celery -A config worker -l info