juncaoit

一:介绍

1.模型

  有两种情形,分别是轮训分发与公平分发。

  

 

2.出现的场景

  考虑到simple queue中的缺点。

  因为生产者发送消息后,消费者消费要花费时间,这个会造成消息的堆积。

 

二:Round robin--轮循

1.发送程序

  这个与简单程序类似,只是发送多条数据而已。

 1 package com.mq.work.round;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 public class RoundWorkSend {
 8     private static final String QUENE_NAME="test_work_queue";
 9     public static void main(String[] args) throws Exception {
10         //获取一个连接
11         Connection connection= ConnectionUtil.getConnection();
12         //从连接中获取一个通道
13         Channel channel=connection.createChannel();
14         //创建队列声明
15         channel.queueDeclare(QUENE_NAME,false,false,false,null);
16 
17         //消息与发送放入for循环
18         for (int i=0;i<50;i++){
19             String msg="hello "+i;
20             System.out.println("[send msg]:"+msg);
21             channel.basicPublish("",QUENE_NAME,null,msg.getBytes());
22             Thread.sleep(i*1);
23         }
24 
25         //关闭连接
26         channel.close();
27         connection.close();
28     }
29 }

 

2.消费者一

 1 package com.mq.work.round;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class RoundWorkReceive1 {
 9     private static final String QUENE_NAME="test_work_queue";
10     public static void main(String[] args)throws Exception{
11         //获取一个连接
12         Connection connection = ConnectionUtil.getConnection();
13         //创建通道
14         Channel channel = connection.createChannel();
15         //创建队列声明
16         channel.queueDeclare(QUENE_NAME,false,false,false,null);
17         //创建消费者
18         DefaultConsumer consumer=new DefaultConsumer(channel){
19             @Override
20             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
21                 String msg=new String(body,"utf-8");
22                 System.out.println("[1]receive msg:"+msg);
23                 try {
24                     Thread.sleep(200);
25                 } catch (InterruptedException e) {
26                     e.printStackTrace();
27                 }finally {
28                     System.out.println("done");
29                 }
30             }
31         };
32         //监听队列
33         boolean autoAck=true;
34         channel.basicConsume(QUENE_NAME,autoAck,consumer);
35     }
36 }

 

3.消费者二

 1 package com.mq.work.round;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class RoundWorkReceive2 {
 9     private static final String QUENE_NAME="test_work_queue";
10     public static void main(String[] args)throws Exception{
11         //获取一个连接
12         Connection connection = ConnectionUtil.getConnection();
13         //创建通道
14         Channel channel = connection.createChannel();
15         //创建队列声明
16         channel.queueDeclare(QUENE_NAME,false,false,false,null);
17         //创建消费者
18         DefaultConsumer consumer=new DefaultConsumer(channel){
19             @Override
20             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
21                 String msg=new String(body,"utf-8");
22                 System.out.println("[2]receive msg:"+msg);
23                 try {
24                     Thread.sleep(300);
25                 } catch (InterruptedException e) {
26                     e.printStackTrace();
27                 }finally {
28                     System.out.println("done");
29                 }
30             }
31         };
32         //监听队列
33         boolean autoAck=true;
34         channel.basicConsume(QUENE_NAME,autoAck,consumer);
35     }
36 }

 

4.现象

  send

  

  receive1:

  

  receive2:

  

 

三:fair dispatcher

1.介绍

  使用公平分发需要关闭自动应答,改成手动。

  有一种通俗的说法是:能者多劳。 

 

2.生产者

  需要改动的地方是:每个消费者在得到确认消息之前,消息队列不得发送一个消息给消费者,一次只能处理一个消息。

 1 package com.mq.work.fair;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 public class FairWorkSend {
 8     private static final String QUENE_NAME="test_work_queue";
 9     public static void main(String[] args) throws Exception {
10         //获取一个连接
11         Connection connection= ConnectionUtil.getConnection();
12         //从连接中获取一个通道
13         Channel channel=connection.createChannel();
14         //创建队列声明
15         channel.queueDeclare(QUENE_NAME,false,false,false,null);
16 
17         //限制发送给一个消费者不得超过1条
18         int prefetchCount=1;
19         channel.basicQos(prefetchCount);
20 
21         //消息与发送放入for循环
22         for (int i=0;i<50;i++){
23             String msg="hello "+i;
24             System.out.println("[send msg]:"+msg);
25             channel.basicPublish("",QUENE_NAME,null,msg.getBytes());
26             Thread.sleep(i*1);
27         }
28 
29         //关闭连接
30         channel.close();
31         connection.close();
32     }
33 }

 

3.消费者一

  需要改动的行数,14,18,33,38

 1 package com.mq.work.fair;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class FairWorkReceive1 {
 9     private static final String QUENE_NAME="test_work_queue";
10     public static void main(String[] args)throws Exception{
11         //获取一个连接
12         Connection connection = ConnectionUtil.getConnection();
13         //创建通道
14         final Channel channel = connection.createChannel();
15         //创建队列声明
16         channel.queueDeclare(QUENE_NAME,false,false,false,null);
17 
18         //一次只能发送一个消息
19         channel.basicQos(1);
20 
21         //创建消费者
22         DefaultConsumer consumer=new DefaultConsumer(channel){
23             @Override
24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25                 String msg=new String(body,"utf-8");
26                 System.out.println("[1]receive msg:"+msg);
27                 try {
28                     Thread.sleep(200);
29                 } catch (InterruptedException e) {
30                     e.printStackTrace();
31                 }finally {
32                     System.out.println("done");
33                     //手动应答
34                     channel.basicAck(envelope.getDeliveryTag(),false);
35                 }
36             }
37         };
38         //监听队列,不是自动应答
39         boolean autoAck=false;
40         channel.basicConsume(QUENE_NAME,autoAck,consumer);
41     }
42 }

 

3.消费者二

  与消费者一不同点在于消费每个消息的时间不同。

 1 package com.mq.work.fair;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class FairWorkReceive2 {
 9     private static final String QUENE_NAME="test_work_queue";
10     public static void main(String[] args)throws Exception{
11         //获取一个连接
12         Connection connection = ConnectionUtil.getConnection();
13         //创建通道
14         final Channel channel = connection.createChannel();
15         //创建队列声明
16         channel.queueDeclare(QUENE_NAME,false,false,false,null);
17 
18         //一次只能发送一个消息
19         channel.basicQos(1);
20 
21         //创建消费者
22         DefaultConsumer consumer=new DefaultConsumer(channel){
23             @Override
24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25                 String msg=new String(body,"utf-8");
26                 System.out.println("[1]receive msg:"+msg);
27                 try {
28                     Thread.sleep(500);
29                 } catch (InterruptedException e) {
30                     e.printStackTrace();
31                 }finally {
32                     System.out.println("done");
33                     //手动应答
34                     channel.basicAck(envelope.getDeliveryTag(),false);
35                 }
36             }
37         };
38         //监听队列,不是自动应答
39         boolean autoAck=false;
40         channel.basicConsume(QUENE_NAME,autoAck,consumer);
41     }
42 }

 

4.现象

  消费者一:

  

 

  消费者二:

   

 

分类:

技术点:

相关文章: