【问题标题】:Celery dynamic queue creation and routingCelery 动态队列创建和路由
【发布时间】:2012-08-09 11:52:17
【问题描述】:

我正在尝试调用一个任务并为该任务创建一个队列,如果它不存在,然后立即将调用的任务插入该队列。我有以下代码:

@task
def greet(name):
    return "Hello %s!" % name


def run():
    result = greet.delay(args=['marc'], queue='greet.1',
        routing_key='greet.1')
    print result.ready()

然后我有一个自定义路由器:

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'tasks.greet':
            return {'queue': kwargs['queue'],
                    'exchange': 'greet',
                    'exchange_type': 'direct',
                    'routing_key': kwargs['routing_key']}
        return None

这将创建一个名为greet.1 的交换和一个名为greet.1 的队列,但队列为空。交换应该只是称为greet,它知道如何将greet.1 之类的路由密钥路由到名为greet.1 的队列。

有什么想法吗?

【问题讨论】:

    标签: python rabbitmq celery


    【解决方案1】:

    当您执行以下操作时:

    task.apply_async(queue='foo', routing_key='foobar')
    

    然后 Celery 将从 CELERY_QUEUES 中的 'foo' 队列中获取默认值, 或者如果它不存在,则使用 (queue=foo, exchange=foo, routing_key=foo) 自动创建它

    因此,如果 CELERY_QUEUES 中不存在“foo”,您最终会得到:

    queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')
    

    然后生产者将声明该队列,但是由于您覆盖了 routing_key, 实际使用routing_key = 'foobar'发送消息

    这可能看起来很奇怪,但这种行为实际上对主题交换很有用, 发布到不同主题的地方。

    虽然很难做你想做的事,你可以自己创建队列 并声明它,但这不适用于自动消息发布重试。 如果 apply_async 的 queue 参数可以支持会更好 一个自定义的kombu.Queue 将被声明并用作目的地。 也许你可以在http://github.com/celery/celery/issues

    上打开一个问题

    【讨论】:

    • 我不再担心手动创建队列,而是产生一个新的工作人员来自动创建队列和交换,这对我的问题更有意义。一如既往,感谢您的回复。 :)
    猜你喜欢
    • 2013-09-14
    • 2011-12-15
    • 2017-03-30
    • 1970-01-01
    • 2012-04-22
    • 1970-01-01
    • 2015-06-16
    • 2020-06-08
    • 1970-01-01
    相关资源
    最近更新 更多