【问题标题】:Consumer allocation RabbitMQ消费者分配 RabbitMQ
【发布时间】:2014-04-28 14:32:46
【问题描述】:

在设计 rabbit-mq 消费者分布时需要帮助。

例如, 有 100 个队列和 10 个线程来使用这 100 个队列中的消息。 每个线程将使用来自 10 个队列的消息。 问题1:如何动态地将线程分配给队列?如果线程在不同的机器上运行?

从一个队列中消费的线程不能超过一个(以保持在相应队列中处理消息的顺序) 问题2:当系统运行时需要增加消费者线程时,怎么做?

【问题讨论】:

  • 您可以使用 ExecutorService (rabbitmq.com/api-guide.html) 将线程分配给消费者。它是什么意思“如果线程在不同的机器上运行?”我认为你可以解决只使用从所有消费者共享的 ThreadPoolExecutor 并动态调整它的大小。这个问题是通用的,有很多方法可以实现你想要的。只是一个 OT,我已经回答了你的一个问题 (*.com/questions/23333863/…),非常感谢您的反馈。
  • 你能分享一个例子吗?如果我们委托给 ExecutorService ,我们如何确保处理消息的顺序? “如果线程在不同的机器上运行” - 例如。在这 10 个线程中,5 个来自 NodeA,5 个来自 NodeB

标签: rabbitmq distribution consumer


【解决方案1】:

关于消息顺序(FIFO)的帖子很多,在正常情况下(一个生产者一个消费者没有网络问题)你没有任何问题。但是你可以读到here

特别注意“除非设置了重新传递的字段”条件, 这意味着消费者的任何断开连接都可能导致消息挂起 确认随后将无序交付。

此外,例如,如果您发布了一条消息,并且在发布过程中出现了一些错误,您必须以正确的顺序重新发布该消息。 这意味着如果您绝对需要消息顺序,则必须实现它,例如用序列号标记每个数据包,并且您还应该实现确认发布。

我认为,但这是我的观点,当您使用消息系统时,您不应该担心消息顺序,因为它应该是您的应用程序能够管理数据。

话虽如此,如果我们假设 100 个队列必须处理相同的消息类型,您可以使用 ThreadPoolExecutor 并从所有消费者共享它。 例如:

public class ActualConsumer extends DefaultConsumer { 
public ActualConsumer(Channel channel) { 
super(channel); 
} 
@Override 
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws java.io.IOException { 
MyMessage message = new MyMessage(body); 
    mythreadPoolExecutorShared.submit(new MyHandleMessage(message))
} 

}

通过这种方式,您可以平衡线程之间的消息。 此外,对于 threadPool,您可以使用不同的策略,例如具有固定线程数的静态分配或动态线程分配。
请阅读这篇关于线程池调整大小的帖子 (Can you dynamically resize a java.util.concurrent.ThreadPoolExecutor while it still has tasks waiting) 您可以将此模式应用于所有节点,这样您就可以平衡调度消息并分配正确的线程数。

我希望它有用,我想更详细一点,但你的问题有点笼统。

【讨论】: