【问题标题】:celery tasks queue not working with rabbitmq芹菜任务队列不适用于rabbitmq
【发布时间】:2017-09-15 13:41:28
【问题描述】:

Celery 任务在没有队列的情况下成功执行

设置。

BROKER_URL = "amqp://user:pass@localhost:5672/test"
# Celery Data Format
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'

CELERYD_TASK_SOFT_TIME_LIMIT = 60
CELERY_IGNORE_RESULT = True

@app.task
def test(a,b,c):
    print("doing something here...")

命令

celery worker -A proj -E -l INFO

上面的 setup worker 执行成功了。

我已经在 celery 任务中引入了队列。

使用之前的设置添加配置

from kombu.entity import Exchange, Queue

CELERY_QUEUES = (
    Queue('high', Exchange('high'), routing_key='high'),
    Queue('normal', Exchange('normal'), routing_key='normal'),
    Queue('low', Exchange('low'), routing_key='low'),
)

CELERY_DEFAULT_QUEUE = 'normal'
CELERY_DEFAULT_EXCHANGE = 'normal'
CELERY_DEFAULT_ROUTING_KEY = 'normal'

CELERY_ROUTES = {
    'myapp.tasks.test': {'queue': 'high'},
}

命令

celery worker -A proj -E -l INFO -n worker.high -Q high

打电话

 test.delay(1, 2, 3)

当我执行时队列工作器没有运行。我错过了任何配置吗?

【问题讨论】:

  • 兔子和芹菜原木里面有什么信息吗?
  • 我没有添加任何芹菜原木。我会添加它。我需要在 rabbitmq 上为 celery 配置队列名称吗?
  • 你不必对 rabbit 做任何事情,只要确保 rabbit 正在运行并监听那个端口和接口,如果你在一台 linux 机器上并且可以访问 shell 运行 netstat -putan | grep 5672
  • 是的,跑步和聆听。
  • 当我添加 @app.task(queue='high') 时它起作用了。

标签: python rabbitmq celery django-celery


【解决方案1】:

将 CELERY_ROUTES 更改为 CELERY_TASK_ROUTES- 在版本 4 中更改

【讨论】:

    【解决方案2】:

    首先,确保在 rabbit 和 high worker 日志中都建立了连接。

    然后,尝试将您的CELERY_ROUTES 更改为:

    CELERY_ROUTES = {
        'myapp.tasks.test': {
            'exchange': 'high',
            'exchange_type': 'high',
            'routing_key': 'high'
        }
    }
    

    或者用queue调用任务,例如:

    test_task = test.signature(args=(1, 2, 3), queue='high', immutable=True)
    test_task.apply_async()
    

    【讨论】: