【问题标题】:Java RabbitMQ - How do I switch Keyword in Topic exchange?Java RabbitMQ - 如何在主题交换中切换关键字?
【发布时间】:2017-01-04 19:29:17
【问题描述】:

我有一个关键字列表,我想从发布者那里向每个关键字发送一条消息。

但当我尝试遍历所有消息时,我的订阅者会收到每条消息,无论它是否订阅。

public class BrokerProducer {
public static String[] topics = {"Beauty","Meat","Soft Drinks","Fruits and Vegetables","Alcoholic Drinks"};


/**
 * @param args the command line arguments
 */
public static void main(String[] args){

    try {
        CreateProducer();
        /*
        Broker RabbitMQ en 155.54.204.46
        (diana.inf.um.es)
        – Port 5672
        – Username master
        – Password master*/
        // TODO code application logic here
    } catch (IOException ex) {
        Logger.getLogger(BrokerProducer.class.getName()).log(Level.SEVERE, null, ex);
    } catch (TimeoutException ex) {
        Logger.getLogger(BrokerProducer.class.getName()).log(Level.SEVERE, null, ex);
    }

    System.out.print("success");
}

private static void CreateProducer() throws IOException, TimeoutException{
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("master");
    factory.setPassword("master");
    factory.setHost("diana.inf.um.es"); 
    factory.setPort(5672);
    Connection conn = factory.newConnection();

    Channel channel = conn.createChannel();

    channel.exchangeDeclare("SupermarketExchange1", "topic");
    //channel.basicPublish("SupermarketExchange1", "testrouting.*", null, "testmessage".getBytes());
    //channel.basicPublish("SupermarketExchange1", "testrouting.Fruits and Vegetables", null, "testmessage".getBytes());

    for(String top: topics){
        //channel.exchangeDeclare("SupermarketExchange1", "topic");
        publish(channel, "SupermarketExchange1", top);
        System.out.println(top);
        //channel.exchangeDelete("SupermarketExchange1");
    }
}

private static void publish(Channel channel, String Exchange, String Topic) throws IOException{
    String Message = "You are subscribed to"+ Topic;

    channel.basicPublish(Exchange,"testrouting."+ Topic, null, Message.getBytes());

}

}

【问题讨论】:

  • 似乎您的消费者订阅了所有路由键。阅读thisthis 文章。他们解释了路由在 RabbitMQ 中的工作原理。
  • 不应该,我可以发布我的订阅者代码。路由键似乎是“testrouting.Meat,但它仍然收到所有内容。

标签: java rabbitmq exchange-server publish-subscribe amqp


【解决方案1】:

订阅者:

public static void CreateConsumer(String subTopic) throws TimeoutException, IOException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("master");
    factory.setPassword("master");
    factory.setHost("diana.inf.um.es");
    factory.setPort(5672);
    Connection conn = factory.newConnection();

    Channel channel = conn.createChannel();
    channel.exchangeDeclare("SupermarketExchange1", "topic");
    //channel.basicPublish("SupermarketExchange", "testrouting", null, "testmessage".getBytes());
    channel.queueDeclare("IncomingQueue2", false, false,false,null);
    System.out.println("createconsumer");
    channel.queueBind("IncomingQueue2", "SupermarketExchange1", "testrouting."+ subTopic);

    JonathanConsumer consum = new JonathanConsumer(channel);
    channel.basicConsume("IncomingQueue2",false, consum);


}

和handledelivery:

public class JonathanConsumer extends DefaultConsumer {
    public JonathanConsumer(Channel channel) {
        super(channel);
    }

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    super.handleDelivery(consumerTag, envelope, properties, body); //To change body of generated methods, choose Tools | Templates.
    System.out.println("Handle delivery");

    Log.d("MyTag", new String(body));
    System.out.println(new String(body));
}
}

【讨论】:

    最近更新 更多