【问题标题】:RabbitMQ queue and routing keyRabbitMQ 队列和路由键
【发布时间】:2018-10-18 11:24:48
【问题描述】:

在文档中 https://docs.spring.io/spring-amqp/reference/htmlsingle/ 我明白了

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(Order order) {

  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(Invoice invoice) {

  }

这里有 1 个队列和 2 个另外的路由键,每个人都为他的方法 但是我的代码没有从密钥获取消息!

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = DRIVER_QUEUE, durable = "true"),
            exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
            key = "order")
    )
    public String getOrders(byte[] message) throws InterruptedException {
         System.out.println("Rout order");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = DRIVER_QUEUE, durable = "true"),
            exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
            key = "invoice")
    )
    public String getOrders(byte[] message) throws InterruptedException {
         System.out.println("Rout invoice");
    }

他们都从队列中获取消息,但看不到密钥... 站点使用“发票”键发送队列消息,我在控制台“路由顺序”中看到 什么问题??非常感谢!

rabbitmq 3.7.3 春天 4.2.9 org.springframework.amqp 1.7.5

【问题讨论】:

    标签: java spring rabbitmq queue


    【解决方案1】:

    错误是您将所有消息发送到同一个队列。

    您应该为每个侦听器使用不同的队列。您的绑定只是告诉 RK="invoice" 和 RK="order" 的消息必须在同一个队列中,而不是侦听器使用该 RK 处理队列元素。

    你应该绑定例如通过键“invoice”交换到 DRIVER_QUEUE1(例如“queue-orders”)并通过键“order”交换到 DRIVER_QUEUE2(例如“queue-invoices”)。通过这种方式,您可以分离消息,并且可以放置两个侦听器,一个用于发票,一个用于订单。例如。像这样:

    @RabbitListener(queues = "queue-orders")
    public void handleOrders(@Payload Order in,
          @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String key) {
       logger.info("Key: {}, msg: {}",key,in.toString());
    }
    
    
    @RabbitListener(queues = "queue-invoices")
    public void handleInvoices(@Payload Invoice in, 
          @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String key) {
       logger.info("Key: {}, msg: {}",key,in.toString());
    }
    

    我不喜欢整个完整的注释,因为当代理配置完成时,恕我直言,完整的注释变得无用(或者更好的是,添加额外的检查对我来说无用)。但如果你愿意,整个注释应该是这样的

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-orders", durable = "true"),
            exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
            key = "invoice")
    )
    

    然后你就可以通过convertAndSend(exchangename, routingkey, object) 发送消息了

    Order order = new Order(...);
    rabbitTemplate.convertAndSend("exchange", "order", order);
    Invoice invoice = new Invoice(...);
    rabbitTemplate.convertAndSent("exchange", "invoice", invoice);
    

    如果您的启动应用程序实现了 RabbitListenerConfigurer,那么您可以将所有内容配置为例如

    @SpringBootApplication
    public class MyApplication implements RabbitListenerConfigurer {
       // other config stuff here....
    
        @Bean("queue1")
        public Queue queue1() {
            return new Queue("queue-orders", true);
        }
    
        @Bean("queue2")
        public Queue queue2() {
            return new Queue("queue-invoices", true);
        }
    
        @Bean
        public Binding binding1(@Qualifier("queue1") Queue queue, TopicExchange exchange) {        
            return BindingBuilder.bind(queue).to(exchange).with("invoice");
        }
    
        @Bean
        public Binding binding2(@Qualifier("queue2") Queue queue, TopicExchange exchange) {        
            return BindingBuilder.bind(queue).to(exchange).with("order");
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
            return rabbitTemplate;
        }
    
        @Bean
        public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            factory.setMessageConverter(consumerJackson2MessageConverter());
            return factory;
        }
    
        @Override
        public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
            registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
        }
    
        // Exchange.
        @Bean
        public TopicExchange exchange() {
            return new TopicExchange("exchange");
        }
    }
    

    希望能满足您的要求。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-03-30
      • 2015-03-31
      • 2016-10-30
      • 1970-01-01
      • 2014-01-21
      • 1970-01-01
      • 2016-06-12
      • 1970-01-01
      相关资源
      最近更新 更多