【问题标题】:Celery&&RabbitMQ how to Publish message to exchange?Celery&&RabbitMQ 如何发布消息进行交换?
【发布时间】:2018-12-31 14:02:18
【问题描述】:

现在,我想将register 事件发布到某个特殊的交易所,我可以使用 celery 远程检索和处理它。

其实我已经使用send_task函数实现了这个,但是它必须通过task_name来指示应该执行哪个任务并消费它。所以这对我的目标来说似乎不是那么完美。

我想要的就是这样:

  1. register消息发布到某个Exchange
  2. 远程机器1订阅此topicroute_key并捕获消息,用于执行任务;
  3. 远程机器 2- 与机器 1 相同,但执行另一个任务- 接收(可能需要回复某些 queue

例如,就像这样的工作流程:

注册:

  • 发送电子邮件
  • 生成信息

    ......

【问题讨论】:

    标签: rabbitmq celery


    【解决方案1】:

    如果我需要非标准交换,我会这样做。

    在我的 celeryconfig 中,我指定了该交换并像这样为其分配队列(在我的情况下,我需要 fanout 交换):

    from kombu.common import Broadcast
    from kombu import Exchange, Queue
    CELERY_QUEUES = (
       Broadcast(name='queue_name', exchange=Exchange('queue_name', type='fanout')),
    )
    

    然后我用 celery multi 生成 worker 并将其分配给我的特定队列,如下所示:

    celery multi start 1 -A my_project -Q:1 queue_name -c:1 1 (other options go here)
    

    然后我可以像这样将我的任务插入到该队列中:

    from my_project import my_fancy_task
    my_fancy_task.apply_async(args=(x, y, z), queue="my_queue")
    

    我不太了解您的具体用例,如果您需要一台主机上的工作人员使用一个队列中的任务,然后另一台主机上的工作人员使用另一个队列中的任务,那么只需将您的任务分成两个队列并配置每个主机启动工作人员并将他们分配到对您有意义的任何队列。也许这会有所帮助:Topic exchange ambiguity with RabbitMQ

    【讨论】:

    • 感谢您的慷慨帮助,我确实想让生产者和消费者分开,这样我就可以发布一条消息并由不同的主机消费。在您的解决方案中,我认为我必须调用某个任务,这与我一次调用多个任务的目标相反。昨天,我找到了一个只使用 Celery 的较低 api docs.celeryproject.org/en/latest/userguide/…> 的解决方案,尽管它看起来并不漂亮,但它在一定程度上对我有用。
    • 在这种情况下,您不必担心交换,只需在第二台主机上启动您的工作人员,并确保该工作人员将连接到与您的生产者相同的rabbitmq 队列。我个人在我的主机上运行集群 rabbitmq,但您不必这样做 - 您只需在一台主机上运行 rabbitmq 并将消费者和发布者连接到它。当您运行您的消费者时,请确保您使用-Q 选项指定您的特定队列。我仍然很难理解这一点:my goal that call multiple task at once - 你是什么意思?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-07-13
    • 2019-12-27
    • 1970-01-01
    • 1970-01-01
    • 2018-12-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多