【问题标题】:celery feature "reply_to" don't works as expected芹菜功能“reply_to”没有按预期工作
【发布时间】:2017-02-25 02:36:29
【问题描述】:

我需要配置 celery 应该将任务执行结果放入哪个队列,我使用这种方式作为described in documentation(项目“reply_to”):

@app.task(reply_to='export_task')  # <= configured right way
def test_func():
    return "here is result of task"

预期行为

任务结果应该在名为“export_task”的队列中(在装饰器中配置)

实际行为

任务结果位于队列中,名称如下: d5587446-0149-3133-a3ed-d9a297d52a96


芹菜报告:

python -m celery -A my_worker report

software -> celery:3.1.24 (Cipater) kombu:3.0.37 py:3.5.1
            billiard:3.3.0.23 py-amqp:1.4.9
platform -> system:Windows arch:64bit, WindowsPE imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:amqp results:rpc:///

CELERY_ACCEPT_CONTENT: ['json']
CELERY_RESULT_BACKEND: 'rpc:///'
CELERY_QUEUES:
    (<unbound Queue main_check -> <unbound Exchange main_check(direct)> -> main_check>,)
CELERYD_CONCURRENCY: 10
CELERY_TASK_SERIALIZER: 'json'
CELERY_RESULT_PERSISTENT: True
CELERY_ROUTES: {
 'my_worker.test_func': {'queue': 'main_check'}}
BROKER_TRANSPORT: 'amqp'
CELERYD_MAX_TASKS_PER_CHILD: 3
CELERY_RESULT_SERIALIZER: 'json'

复制步骤

请创建项目文件。

celery_app.py

from celery import Celery
from kombu import Exchange, Queue

app = Celery('worker')

app.conf.update(
    CELERY_ROUTES={
        'my_worker.test_func': {'queue': 'main_check'},
    },
    BROKER_TRANSPORT='amqp',
    CELERY_RESULT_BACKEND='rpc://',
    CELERY_RESULT_PERSISTENT=True,
    # CELERY_DEFAULT_DELIVERY_MODE='persistent',
    # CELERY_RESULT_EXCHANGE='export_task',
    CELERYD_CONCURRENCY=10,
    CELERYD_MAX_TASKS_PER_CHILD=3,
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],

    CELERY_QUEUES=(
        Queue('main_check', Exchange('main_check', type='direct'), routing_key='main_check'),
    ),
)

my_worker.py:

from celery_app import app

@app.task(reply_to='export_task')
def test_func():
    return "here is result of task"

然后开始芹菜:

python -m celery -A my_worker worker --loglevel=info

然后在 python 调试控制台中添加新任务:

from my_worker import *
result = test_func.delay()

我要求在official issue tracker 上提供帮助,但没人在乎。

【问题讨论】:

  • +1 表示没人关心。如果你想用 celery 做一些真正的事情,那么文档就是可怕的,当你提出问题时没人在乎。
  • 是的。如果你感兴趣的话,我雇了一个解决部分问题的人,在线程的末尾github.com/celery/celery/issues/3848
  • 有同样的问题。它还没有解决...你解决了这个问题吗?
  • @SebastianWozny - 如您所知,这是一个开源项目,所以如果文档中有您不喜欢的内容,请务必提交 PR!

标签: python rabbitmq celery


【解决方案1】:

我没有在您的代码中看到该队列 (export_task) 的声明位置。

【讨论】:

猜你喜欢
  • 2013-07-02
  • 2021-12-26
  • 2015-02-23
  • 1970-01-01
  • 2021-02-25
  • 1970-01-01
  • 2013-08-29
  • 2013-04-28
  • 2021-01-08
相关资源
最近更新 更多