【问题标题】:Spring Boot RabbitMQ Single Queue with multiple consumers具有多个消费者的 Spring Boot RabbitMQ 单队列
【发布时间】:2021-06-28 07:07:09
【问题描述】:

我是 Rabbitmq 的新手,我想对多个消费者使用单个队列,请帮助我处理多个消费者,我们如何处理来自单个队列的多个消费者的请求?

下面是我的单队列和单消费者代码

我的配置类

@Configuration
public class ConfigureRabbitMq {

    public static final String EXCHANGE_NAME = "mikeexchange2";
    public static final String QUEUE_NAME = "mikequeue2";


    @Bean
    Queue createQueue() {
        return new Queue(QUEUE_NAME, true, false, false);
    }

    @Bean
    TopicExchange exchange(){
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    Binding binding(Queue q, TopicExchange exchange){
        return BindingBuilder.bind(q).to(exchange).with("mike.#");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory
            , MessageListenerAdapter messageListenerAdapter){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(messageListenerAdapter);
        return container;
    }


    @Bean
    MessageListenerAdapter listenerAdapter(Receive handler){
        return new MessageListenerAdapter(handler, "handleMessage");
    }
}

我的接收者类

@Service
public class Receive {

    public void handleMessage(String messageBody){
        System.out.println("HandleMessage!!!");
        System.out.println(messageBody);
        
    }

}

我的发件人类

@RestController
public class Send {

    private final  RabbitTemplate rabbitTemplate;
    
    public  Send(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    
    
    @RequestMapping(method = RequestMethod.GET, value = "/api/send/{msg}")
    public String sendMessage(@PathVariable("msg") String themessage){
        
        for(int i=0;i<5000000;i++) {
            rabbitTemplate.convertAndSend(ConfigureRabbitMq.EXCHANGE_NAME,
                    "mike.springmessages", themessage+""+Integer.toString(i));
        }
        return "We have sent a message! :" + themessage;
    }
}

【问题讨论】:

  • 你是指多个不同的消费者还是同一个消费者的多个实例?
  • @Nik 是多个不同的消费者,实际上当请求接收到不同消费者的路由取决于负载,换句话说,就像负载平衡
  • @Nik 所有消费者都将工作相同,它像负载均衡器一样工作,当请求它重定向到消费者1 和消费者2
  • 我觉得你的设置还是有点不清楚。通常通过运行同一服务的多个实例来实现多个消费者,这与您的“负载平衡”示例非常接近。但是,您实际上不可能选择负载最小的实例,因为没有控制逻辑。
  • 只需启动同一组件的多个实例即可(假设队列不是排他的)。但是,负载平衡是以循环方式完成的。

标签: java spring-boot rabbitmq spring-rabbit


【解决方案1】:

按照您的 cmets,您实际上还不错..但就像 Nik 所说,使用默认配置,您可以实现负载平衡循环。

如果您使用 spring-boot,请确保添加 spring-boot-starter-amqp 的依赖项。

在你的接收方,你通常会有这样的东西:

@Service
public class Receive {

 @RabbitListener(queues = "<queueName>", concurrency = <numConsumersPerInstance>)
 public void handleMessage(String messageBody){
   ...
 }

提供并发参数可以在一个服务实例中实现多线程消息消费。如果您真的希望一个实例同时只使用一条消息,您可以将其设置为 1。检查@RabbitListener 的源代码以获取有关并发的进一步指导。

【讨论】:

  • 另请参阅AbstractRabbitListenerContainerFactoryprefetch 选项,以确保您的实例一次不会消耗超过一个消息。默认是 250,所以默认情况下不是循环...