【问题标题】:Run multiple Celery tasks using a topic exchange使用主题交换运行多个 Celery 任务
【发布时间】:2016-07-20 22:19:06
【问题描述】:

我正在用 Celery 替换一些本地代码,但很难复制当前的行为。我想要的行为如下:

  • 创建新用户时,应使用user.created 路由键将消息发布到tasks 交换。
  • 此消息应触发两个 Celery 任务,即 send_user_activate_emailcheck_spam

我尝试通过使用ignore_result=True 参数定义user_created 任务以及send_user_activate_emailcheck_spam 的任务来实现这一点。

在我的配置中,我添加了以下路由和队列定义。当消息传递到user_created 队列时,它不会传递到其他两个队列。

理想情况下,消息仅传递到send_user_activate_emailcheck_spam 队列。使用 vanilla RabbitMQ 时,消息会发布到交换器,队列可以绑定到交换器,但 Celery 似乎直接将消息传递到队列。

如何在 Celery 中实现上述行为?

CELERY_QUEUES = {
    'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}

CELERY_ROUTES = {
    'user_created': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'send_user_activate_email': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'check_spam': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
}

【问题讨论】:

    标签: python rabbitmq celery message-queue messaging


    【解决方案1】:

    设计和解决问题的简单方法是使用 Celery 工作流程。
    但首先我会更改您的队列定义,为每个任务设置一个唯一的路由键,并使用“直接”值设置 exchange_type。

    根据celery documentation直接交换通过精确的路由键匹配,因此我们为所有自定义任务和消费者队列设置相同的交换,我们映射 routing_key(用于任务)和 binding_key(用于队列),如下一个sn-p:

    CELERY_QUEUES = {
        'user_created': {'binding_key':'user_created', 'exchange': 'tasks', 'exchange_type': 'direct'},
        'send_user_activate_email': {'binding_key':'send_user_activate_email', 'exchange': 'tasks', 'exchange_type': 'direct'},
        'check_spam': {'binding_key':'check_spam', 'exchange': 'tasks', 'exchange_type': 'direct'},
    }
    
    CELERY_ROUTES = {
        'user_created': {
            'queue': 'user_created',
            'routing_key': 'user_created',
            'exchange': 'tasks',
            'exchange_type': 'direct',
        },
        'send_user_activate_email': {
            'queue': 'send_user_activate_email',
            'routing_key': 'send_user_activate_email',
            'exchange': 'tasks',
            'exchange_type': 'direct',
        },
        'check_spam': {
            'queue': 'check_spam',
            'routing_key': 'check_spam',
            'exchange': 'tasks',
            'exchange_type': 'direct',
        },
    }
    

    完成此更改后,您需要对可用列表 (http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives) 使用正确的工作流程。阅读您的问题,我认为您需要一条链,因为需要保留顺序。

    sequential_tasks = []
    sequential_tasks.append(user_created.s(**user_created_kwargs))
    sequential_tasks.append(send_user_activate_email.s(**send_user_activate_email_kwargs))
    sequential_tasks.append(check_spam.s(**check_spam_kwargs))
    #you can add more tasks to the chain
    chain(*sequential_tasks)()
    

    Celery 将透明地处理与队列相关的工作。

    【讨论】:

    • 您能解释一下为什么我需要为每个任务单独交换吗? send_user_activate_email 和 check_spam 任务可以并行运行,如果这很重要的话。
    【解决方案2】:

    听起来您希望单个消息触发/被两个队列消费,但这不是 Celery 的工作方式。一个 Exchange 会将一个任务发布到符合条件的队列,但是一旦它被使用,其他队列就会忽略该消息。每个要触发的任务都需要一条消息。

    Celery 新用户经常会感到困惑,因为在这个系统中“队列”有两种用途; Queue() 和文档引用的 Kombu 队列,以及直接保存消息并由工作人员使用的 AMQP 队列。当我们发布到队列时,我们会想到 AMQP 队列,这是不正确的。 (感谢下面链接的答案)。

    回到你的问题,如果我理解正确的话,当 user_created 被消费时,你希望它产生另外两个任务; send_user_activate_email 和 check_spam。此外,它们不应相互依赖;它们可以在不同的机器上并行运行,并且不需要知道彼此的状态。

    在这种情况下,您希望 user_created “应用异步”这两个新任务并返回。这可以直接完成,或者您可以使用包含 check_spam 和 send_user_activate_email 的 Celery“组”来实现此目的。该小组提供了一些很好的速记,并为您的任务提供了一些结构,所以我个人会向您推荐这个方向。

    #pseudocode
    group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()
    

    此设置将创建四个消息;一个用于您要执行的每个任务加上一个用于 Group(),它本身就会有一个结果。

    在您的情况下,我不确定 Exchange 或 ignore_result 是否必要,但我需要查看任务代码并更多地了解系统才能做出判断。

    http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups http://celery.readthedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing-keys Why do CELERY_ROUTES have both a "queue" and a "routing_key"?

    (如果我离开了,我会删除/删除答案...)

    【讨论】:

    • 感谢您的详尽解释。从您的答案和文档中收集,Celery 使用 routing_key 在工作人员之间分配任务,而不是让多个任务响应单个消息。这基本上迫使您将触发任务和处理任务的代码紧密耦合。这是正确的吗?
    • @joelcox,我认为这是一个很好的总结。这条规则的例外是 Map() 和 Starmap(),我相信它们为序列中的每个元素执行一个任务,但只发送一条消息。如果您希望任务相互响应(例如,等待另一个成功,因为它需要元数据才能继续),您还可以查看 Chain()、Chord()。
    猜你喜欢
    • 2018-03-21
    • 2011-09-15
    • 1970-01-01
    • 2021-02-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-24
    相关资源
    最近更新 更多