【问题标题】:Spring rabbitmq send to exchange with dynamic bindingSpring rabbitmq 通过动态绑定发送到交换
【发布时间】:2016-02-11 10:37:48
【问题描述】:

我尝试使用 TopicExchange 来屏蔽消息。

配置:

    <rabbit:connection-factory id="connectionFactory"  host="localhost" username="guest" password="guest"/>

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

<rabbit:queue name="sample.queue"/>

<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<rabbit:annotation-driven container-factory="rabbitListenerContainerFactory"/>

<rabbit:listener-container connection-factory="connectionFactory" />

组件:

@Component
public class JmsComponent {

    private final Logger log = LoggerFactory.getLogger(JmsComponent.class);

    private final TopicExchange exchange = new TopicExchange("sample.exchange");

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Queue queue;

    private String received;

    public void send(String msg) {
        rabbitTemplate.convertAndSend("sample.queue", new SimpleMessage(msg));
    }

    public void bindToKey(String keyMask) {
        BindingBuilder.bind(queue).to(exchange).with(keyMask);
        rabbitTemplate.setExchange(exchange.getName());
    }


    public void sendByKey(String key, String msg) {
        rabbitTemplate.convertAndSend(exchange.getName(), key, new SimpleMessage(msg));
    }

    @RabbitListener(queues = "sample.queue")
    public void handle(SimpleMessage message) {
        log.info("================ Received  " + message.getMsg());
        received = message.getMsg();
    }

    public String getReceived() {
        return received;
    }

当我使用发送(包括 TopicExchange 之前)时 - 一切正常。消息被直接发送到队列,handle() 已经接收到它。 但是对于 TopicExchange.... 我尝试使用它:

@Test
public void bind() throws InterruptedException {
    jmsComponent.bindToKey("qq");
    jmsComponent.sendByKey("qq", "message");
    Thread.sleep(5000);
    Assert.isTrue("message".equals(jmsComponent.getReceived()));
}

测试总是失败,但在日志中我看到了这个 - DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [sample.exchange], routingKey = [qq] 怎么了??? 谢谢

【问题讨论】:

    标签: java spring rabbitmq spring-rabbit rabbitmq-exchange


    【解决方案1】:

    这...

    BindingBuilder.bind(queue).to(exchange).with(keyMask);
    

    ...除了创建一个 Binding 对象然后将其丢弃之外什么都不做。您需要获取 Binding 对象并在管理员上调用 declareBinding。您还需要声明交换。

    因为您的上下文中有管理员;最简单的方法是将&lt;rabbit:exchange/&gt; 添加到上下文中(连同绑定)。见the documentation

    <rabbit:queue id="myQueue" name="sample.queue"/>
    
    <topic-exchange name="sample.exchange">
        <bindings>
            <binding queue="myQueue" pattern="bucket.#"/>
        </bindings>
    </topic-exchange>
    

    顺便说一下,主题交换旨在通过关键模式进行路由;如果您只想使用固定键进行路由/绑定,例如qq,请使用直接交换。请参阅RabbitMQ Tutorials

    【讨论】:

      【解决方案2】:

      我更改了 Gary Russell 的组件使用答案:

      • 我已经添加了

        @Autowired
        private RabbitAdmin rabbitAdmin;
        
        @PostConstruct
        public void init(){
            rabbitAdmin.declareExchange(exchange);
        }
        
      • 并修改绑定方法:

        public void bindToKey(String keyMask) {
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(keyMask);
            rabbitAdmin.declareBinding(binding); // re-declare binding if mask changed
            rabbitTemplate.setExchange(exchange.getName());
        }
        
      • 然后测试就成功了!

      更多,我在运行时添加了更改绑定掩码:

      @Test
      public void bind() throws InterruptedException {
          jmsComponent.bindToKey("qq");
          jmsComponent.sendByKey("qq", "message");
          Thread.sleep(5000);
          Assert.isTrue("message".equals(jmsComponent.getReceived()));
      
          jmsComponent.bindToKey("eeeee");
          jmsComponent.sendByKey("eeeee", "message one");
          Thread.sleep(5000);
          Assert.isTrue("message one".equals(jmsComponent.getReceived()));
      }
      

      所有作品。

      【讨论】: