好的,所以我想通了。以下是我的整个设置、设置以及如何运行 celery,对于那些可能想知道与我的问题相同的事情的人。
设置
CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
'arena.social.tasks.Update': {
'queue': 'feeds',
'routing_key': 'long_tasks',
},
}
芹菜怎么跑?
终端 - 选项卡 1:
celery -A proj worker -Q default -l debug -n default_worker
这将启动第一个使用默认队列中的任务的工作人员。笔记! -n default_worker 对于第一个工作人员来说不是必须的,但如果您有任何其他 celery 实例启动并运行,则这是必须的。设置-n worker_name与--hostname=default@%h相同。
终端 - 选项卡 2:
celery -A proj worker -Q feeds -l debug -n feeds_worker
这将启动第二个工作人员,消费者从提要队列中执行任务。注意-n feeds_worker,如果您使用-l debug(日志级别=调试)运行,您将看到两个工作人员正在它们之间同步。
终端 - 选项卡 3:
celery -A proj beat -l debug
这将开始节拍,根据您的CELERYBEAT_SCHEDULE 中的时间表执行任务。
我不必更改任务或CELERYBEAT_SCHEDULE。
例如,这就是我的CELERYBEAT_SCHEDULE 对于应该进入提要队列的任务的外观:
CELERYBEAT_SCHEDULE = {
...
'update_feeds': {
'task': 'arena.social.tasks.Update',
'schedule': crontab(minute='*/6'),
},
...
}
如您所见,无需添加'options': {'routing_key': 'long_tasks'} 或指定它应该进入哪个队列。另外,如果你想知道为什么Update 是大写的,那是因为它是一个自定义任务,它被定义为celery.Task 的子类。
更新 Celery 5.0+
Celery 自第 5 版以来进行了一些更改,这是用于任务路由的更新设置。
如何创建队列?
Celery 可以自动创建队列。它非常适用于简单的情况,其中 celery 的路由默认值是可以的。
task_create_missing_queues=True 或者,如果您正在使用 django 设置并且您正在为 CELERY_ 键下的所有 celery 配置命名空间,CELERY_TASK_CREATE_MISSING_QUEUES=True。注意,默认是开启的。
自动计划任务路由
配置celery app后:
celery_app.conf.beat_schedule = {
"some_scheduled_task": {
"task": "module.path.some_task",
"schedule": crontab(minute="*/10"),
"options": {"queue": "queue1"}
}
}
自动任务路由
Celery 应用还是要先配置再配置:
app.conf.task_routes = {
"module.path.task2": {"queue": "queue2"},
}
任务的手动路由
如果您想动态路由任务,那么在发送任务时指定队列:
from module import task
def do_work():
# do some work and launch the task
task.apply_async(args=(arg1, arg2), queue="queue3")
更多关于路由的细节可以在这里找到:
https://docs.celeryproject.org/en/stable/userguide/routing.html
关于在这里调用任务:
https://docs.celeryproject.org/en/stable/userguide/calling.html