rabbitmq入门学习五
1. 主题模型(Topics)
主题模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队
看下图例子:
解释:
红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配到
业务需求:
最常见的增删改,需要不同的系统去处理,
1.1 生产者代码
使用topic类型的Exchange,发送消息的routing key有3种: item.isnert、item.update、item.delete
[Java] 纯文本查看 复制代码
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
public class TopicsProducer { //队列名称 private static final String QUEUE_INFORM_1 = "topic_exchange_queue_1"; private static final String QUEUE_INFORM_2 = "topic_exchange_queue_2"; private static final String QUEUE_INFORM_3 = "topic_exchange_queue_3"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_UPDATE="item.update"; private static final String ROUTINGKEY_DELETE="item.delete"; private static final String ROUTINGKEY_INSTALL="item.install"; private static final String ROUTINGKEY_ALL="item.#"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { //建立新连接 connection = ConnectionUtil.getConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 channel = connection.createChannel(); channel.queueDeclare(QUEUE_INFORM_1, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_2, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_3, true, false, false, null); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //进行交换机和队列绑定 channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_UPDATE); channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_DELETE); channel.queueBind(QUEUE_INFORM_2, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_INSTALL); channel.queueBind(QUEUE_INFORM_3, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_ALL); //发送消息 //发送消息的时候指定routingKey String message = "新增商品"; channel.basicPublish(EXCHANGE_TOPICS_INFORM, "item.install", null, message.getBytes());// String message = "删除商品";// channel.basicPublish(EXCHANGE_TOPICS_INFORM, "item.delete", null, message.getBytes());// String message = "修改商品";// channel.basicPublish(EXCHANGE_TOPICS_INFORM, "item.update", null, message.getBytes()); System.out.println("send to mq " + message); } catch (Exception e) { e.printStackTrace(); } finally { //关闭连接 //先关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } }} |
1.2 消费者代码1
消费者1代码,接收两种路由模式item.update和item.delete
[Java] 纯文本查看 复制代码
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class TopicsComsumer1 { //队列名称 private static final String QUEUE_INFORM_1 = "topic_exchange_queue_1";; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_UPDATE="item.update"; private static final String ROUTINGKEY_DELETE="item.delete"; public static void main(String[] args) throws Exception { //建立新连接 Connection connection = ConnectionUtil.getConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_INFORM_1,true,false,false,null); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //进行交换机和队列绑定 channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_UPDATE); channel.queueBind(QUEUE_INFORM_1, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_DELETE); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息内容 String message= new String(body,"utf-8"); System.out.println("【消费者1】:"+message); } }; //监听队列 channel.basicConsume(QUEUE_INFORM_1,true,defaultConsumer); }} |
1.3 消费者代码2
消费者2代码,只接收item.Install
[Java] 纯文本查看 复制代码
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public class TopicsComsumer2 { //队列名称 private static final String QUEUE_INFORM_2 = "topic_exchange_queue_2"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_INSTALL="item.install"; public static void main(String[] args) throws Exception { //建立新连接 Connection connection = ConnectionUtil.getConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_INFORM_2,true,false,false,null); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //进行交换机和队列绑定 channel.queueBind(QUEUE_INFORM_2, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_INSTALL); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息内容 String message= new String(body,"utf-8"); System.out.println("【消费者2】:"+message); } }; //监听队列 channel.basicConsume(QUEUE_INFORM_2,true,defaultConsumer); }} |
1.4 消费者代码3
消费者代码3,只接收item.#,只要以item.开头的路由key该消费者都可以接收
[Java] 纯文本查看 复制代码
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public class TopicsComsumer3 { //队列名称 private static final String QUEUE_INFORM_3 = "topic_exchange_queue_3"; private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; private static final String ROUTINGKEY_ALL="item.#"; public static void main(String[] args) throws Exception { //建立新连接 Connection connection = ConnectionUtil.getConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_INFORM_3,true,false,false,null); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); //进行交换机和队列绑定 channel.queueBind(QUEUE_INFORM_3, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_ALL); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息内容 String message= new String(body,"utf-8"); System.out.println("【消费者3】:"+message); } }; //监听队列 channel.basicConsume(QUEUE_INFORM_3,true,defaultConsumer); }} |
1.5 测试
运行生产者代码测试,我们发现交换机绑定的队列中routingkey有值了,分别为:
Item.delete,item.update,item.install,item.#。
然后我们继续运行三个消费者查看消息接收情况:通过下面三张消费者截图,我们可以看出,只有绑定了item.install和item.#的队列可以接收消息,而队列1绑定的是item.delete和item.update无法接收消息。
消费者1:
消费者2:
消费者3: