【问题标题】:RabbitMQ Consumer Design for Multiple Exchange-Queue Model多交换队列模型的 RabbitMQ 消费者设计
【发布时间】:2016-04-26 00:36:24
【问题描述】:

我有一个具有以下配置的 RabbitMQ 设置。

  • 每个 Exchange 都是 FANOUT 类型
  • 每个 Exchange 附加多个队列。
  • BlockingConnection 由消费者制作。
  • 单个消费者处理所有回调。

问题-

一些有效负载比其他负载需要更长的时间来处理,这导致消费者即使在其他队列中有有效负载时也保持空闲状态。

问题-

  1. 我应该如何实现消费者以避免长时间等待?我是不是该 为每个模块运行单独的消费者?任何用户体验?
  2. 我可以配置 RabbitMQ 来处理这些情况吗?如果是的话怎么办?

【问题讨论】:

    标签: rabbitmq message-queue rabbitmq-exchange


    【解决方案1】:

    首先很高兴知道为什么您有多个扇出交换?你真的需要这个吗?扇出交换将消息发送到所有队列...

    1. 只要有更多的消费者。检查this example from rabbitmq tutorial
    2. 您实际上并不需要显式配置rabbitmq,一切都可以通过客户端(发布者和订阅者)完成,您只需要弄清楚您需要多少个交换以及它们应该是哪种类型等。

    【讨论】:

      【解决方案2】:

      首先,您使用的是什么编程语言?大多数常用语言,如python、java、c#,都支持为并行进程创建额外的线程。

      假设您使用下面的队列(伪代码):

      def callback(ch, method, properties, body) ...
      def threaded_function(ch, method, properties, body) ...
      
      channel.basic_qos(prefetch_count=3)
      channel.basic_consume(callback, queue='task_queue')
      channel.start_consuming()
      

      首先,设置“prefetch_count=3”允许您的消费者同时拥有最多 3 条处于未确认状态的消息。

      在回调方法中,你应该使用线程函数启动一个线程来执行每条消息。在 threaded_function 方法体的最后,做:

      ch.basic_ack(delivery_tag = method.delivery_tag)
      

      这样,最多可以同时处理3条消息,即使其中一两个线程需要更长的时间运行,其他线程仍然可以处理下一条消息。

      【讨论】:

        猜你喜欢
        • 2012-07-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多