【问题标题】:Django/Celery multiple queues on localhost - routing not working本地主机上的 Django/Celery 多个队列 - 路由不起作用
【发布时间】:2014-06-01 12:54:00
【问题描述】:

我跟着 celery docs 在我的开发机器上定义了 2 个队列。

我的芹菜设置:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}

我在项目的 virtualenv 中打开了两个终端窗口,并运行了以下命令:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

我得到的是两个队列都在处理所有任务。

我的目标是让一个队列仅处理CELERY_ROUTES 中定义的一项任务,并使用默认队列处理所有其他任务。

我也关注了这个SO questionrabbitmqctl list_queues 返回celery 0,运行rabbitmqctl list_bindings 两次返回exchange celery queue celery []。重启rabbit服务器并没有改变任何东西。

【问题讨论】:

    标签: python django celery celerybeat


    【解决方案1】:

    好的,所以我想通了。以下是我的整个设置、设置以及如何运行 celery,对于那些可能想知道与我的问题相同的事情的人。

    设置

    CELERY_TIMEZONE = TIME_ZONE
    CELERY_ACCEPT_CONTENT = ['json', 'pickle']
    CELERYD_CONCURRENCY = 2
    CELERYD_MAX_TASKS_PER_CHILD = 4
    CELERYD_PREFETCH_MULTIPLIER = 1
    
    # celery queues setup
    CELERY_DEFAULT_QUEUE = 'default'
    CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
    CELERY_DEFAULT_ROUTING_KEY = 'default'
    CELERY_QUEUES = (
        Queue('default', Exchange('default'), routing_key='default'),
        Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
    )
    CELERY_ROUTES = {
        'arena.social.tasks.Update': {
            'queue': 'feeds',
            'routing_key': 'long_tasks',
        },
    }
    

    芹菜怎么跑?

    终端 - 选项卡 1:

    celery -A proj worker -Q default -l debug -n default_worker
    

    这将启动第一个使用默认队列中的任务的工作人员。笔记! -n default_worker 对于第一个工作人员来说不是必须的,但如果您有任何其他 celery 实例启动并运行,则这是必须的。设置-n worker_name--hostname=default@%h相同。

    终端 - 选项卡 2:

    celery -A proj worker -Q feeds -l debug -n feeds_worker
    

    这将启动第二个工作人员,消费者从提要队列中执行任务。注意-n feeds_worker,如果您使用-l debug(日志级别=调试)运行,您将看到两个工作人员正在它们之间同步。

    终端 - 选项卡 3:

    celery -A proj beat -l debug
    

    这将开始节拍,根据您的CELERYBEAT_SCHEDULE 中的时间表执行任务。 我不必更改任务或CELERYBEAT_SCHEDULE

    例如,这就是我的CELERYBEAT_SCHEDULE 对于应该进入提要队列的任务的外观:

    CELERYBEAT_SCHEDULE = {
        ...
        'update_feeds': {
            'task': 'arena.social.tasks.Update',
            'schedule': crontab(minute='*/6'),
        },
        ...
    }
    

    如您所见,无需添加'options': {'routing_key': 'long_tasks'} 或指定它应该进入哪个队列。另外,如果你想知道为什么Update 是大写的,那是因为它是一个自定义任务,它被定义为celery.Task 的子类。

    更新 Celery 5.0+

    Celery 自第 5 版以来进行了一些更改,这是用于任务路由的更新设置。

    如何创建队列?

    Celery 可以自动创建队列。它非常适用于简单的情况,其中 celery 的路由默认值是可以的。

    task_create_missing_queues=True 或者,如果您正在使用 django 设置并且您正在为 CELERY_ 键下的所有 celery 配置命名空间,CELERY_TASK_CREATE_MISSING_QUEUES=True。注意,默认是开启的。

    自动计划任务路由

    配置celery app后:

    celery_app.conf.beat_schedule = {
      "some_scheduled_task": {
        "task": "module.path.some_task",
        "schedule": crontab(minute="*/10"),
        "options": {"queue": "queue1"}
      }
    }
    

    自动任务路由

    Celery 应用还是要先配置再配置:

    app.conf.task_routes = {
      "module.path.task2": {"queue": "queue2"},
    }
    

    任务的手动路由

    如果您想动态路由任务,那么在发送任务时指定队列:

    from module import task
    
    def do_work():
      # do some work and launch the task
      task.apply_async(args=(arg1, arg2), queue="queue3")
    

    更多关于路由的细节可以在这里找到: https://docs.celeryproject.org/en/stable/userguide/routing.html

    关于在这里调用任务: https://docs.celeryproject.org/en/stable/userguide/calling.html

    【讨论】:

    • 我在 celery 3.1 上使用了新格式 'task_queue'。正在使用“CELERY_ROUTES”。
    【解决方案2】:

    除了接受的答案之外,如果有人来到这里仍然想知道为什么他的设置不起作用(就像我刚才所做的那样),原因如下:celery 文档没有正确列出设置名称。

    对于 celery 5.0.5 设置,CELERY_DEFAULT_QUEUECELERY_QUEUESCELERY_ROUTES 应命名为 CELERY_TASK_DEFAULT_QUEUECELERY_TASK_QUEUESCELERY_TASK_ROUTES。这些是我测试过的设置,但我猜同样的规则也适用于交换和路由密钥。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-09-20
      • 2016-03-02
      • 2019-01-08
      • 2014-05-29
      相关资源
      最近更新 更多