1、Rabbitmq是什么?用来干嘛?
大家好,我们今天一起来学习Rabbitmq,学习一门技术之前,需要明白,这门技术用来干什么?能解决什么问题?Rabbitmq是基于Erlang语言开发的一款消息中间件,市场上有很多消息中间件,如:ActiveMQ、Kafka、RocketMQ等…,它们的优缺点,暂不评论,各家都说各家好。这里要强调一点,Erlang设计之初就是用来解决高并发,利用多核Cpu编程。
可能很多朋友不知道消息中间件,我个人理解消息中间件就是接收消息、存储消息、投递消息,我们使用消息中间件,很多时候就是在使用消息中间件的异步处理模式。那在什么业务场景下,我们会使用消息中间件呢?首先我来抛出三个问题:
- 同步串行处理耗时;
- 并发流量削峰;
- 程序设计解耦;
同步串行处理耗时:
软件设计开发中典型例子,就是用户注册成功后邮箱通知,同步rpc远程调用邮箱服务是一个耗时操作,如图:
图中描述用户注册串行处理的过程,总耗时 = 注册请求到达业务服务器的耗时(1) + 业务服务器处理注册逻辑的耗时(2) + 业务服务器请求到达邮件服务器的耗时(3) + 邮件服务器处理发送邮件的耗时(4) + 邮件服务器应答到业务服务器的耗时(5) + 业务服务器应答到用户的耗时(6),用户使用程序,串行处理时,需要接受6步总耗时,其实用户使用程序仅需1、2、6的耗时,这是最好的体验。那有没有办法解决呢?有,那就是使用Rabbitmq消息中间件,将邮件发送异步处理,如图:
从图中我们可以看出,用户注册模块(上半部分)和邮件发送模块(下半部分),Rabbitmq将用户注册与邮件发送异步分开了,用户注册成功后,给Rabbitmq发一个消,说某人注册成功了,你收到消息后,请马上发邮件告知一下,然后立即返回,然后Rabbitmq会异步通知消息消费服务器去发送邮件,请记住业务服务器发送消息到Rabbbitmq,和Rabbitmq推送消息到消息消费服务器是异步的。
我这里说一下,什么是同步和异步,打个比方有条河,河下游有一个人距离你200米,你需要拿一样东西给他。此时你有两种选择,一种是直接游过去给他,这叫同步。另一种直接将东西绑到一块木头上漂过去给他,这叫异步。同一样都是把东西给到下游的人,第一种游过去给他,则是耗时耗力。第二种仅是多耗绑东西的时间(可不计),然后你就可以去干其它事情,木头你可以理解为Rabbitmq。现在我们再来看看,用户注册总耗时 = 注册请求到达业务服务器的耗时(1) + 业务服务器处理注册逻辑的耗时(2) + 投递消息到Rabbitmq的耗时(3、4和5) + 业务服务器应答到用户的耗时(6),其中投递消息到Rabbitmq的耗时很短,可忽略不计(tcp通信且网络通道已建立)。这样我们就用Rabbitmq异步消息通知机制解决了串行问题。
并发流量削峰:
经典例子就是秒杀活动,阿里双11秒杀,短时间上亿的用户涌入,瞬间流量巨大(高并发),比如:200万人准备在凌晨12:00准备抢购一件商品,但是商品的数量是有限的100~500件左右。这样能购买到该件商品的用户也只有几百人左右, 但是从业务上来说,秒杀活动是希望更多的人来参与,也就是抢购之前希望有越来越多的人来看购买商品。下面我画图简要说明此过程,重在理解,真实场景肯定需要更多容错处理:
注意: 图中黑线部分与红线部分是异步操作。
- 客户端黑线(1)请求到达服务器,进入秒杀请求处理服务器(2)的逻辑,首先通过商品ID在Redis判断商品是否存在,不存在黑线(5)应答秒杀失败到客户端,存在再判断商品是否已秒杀完,若完黑线(5)应答秒杀失败到客户端,若未完黑线(3)向Rabbitmq投递消息,消息入队列后黑线(4)应答秒杀请求处理服务器,黑线(5)再应答客户端秒杀成功。
- Rabbitmq红线(1)推送消息到秒杀处理服务器,然后进入秒杀处理服务器(2)的逻辑,主要是订单生成,库存减1等…,然后红线(3)应答Rabbitmq(关于消息推送后是否应答,后面会论,现在暂时这样理解)。
注意:这里一定要记住,Rabbitmq消息的生产投递和消息的推送消费是异步的,其次客户端应该是有支付逻辑的,简单一点,就是收到秒杀成功后,就定时向秒杀处理服务器要订单支付消息,获取成功后就去支付。
程序设计解耦:
其实同步串行处理耗时和并发流量削峰,就是得益于Rabbitmq的快速消息投递和异步消息推送处理,解耦的本质就是程序生产消息,只需知道这个消息用来干嘛就行,不用管消费者怎么去实现。如:生产者发送消息说,你往xxxx.com给我发送一封邮件,生产者只需要把消息推送到Rabbitmq就行,生产者不用管消费者怎么发,用那家公司邮件服务去发。
2、Rabbitmq的基本架构:
组成部分简要说明:
Broker: Rabbitmq服务进程,主要包括两个部分Exchange(交换机)和Queue(队列);
Exchange: 交换机,按一定的规则将消息路由到某个队列,不同类型的交换机有不同的路由规则;
Queue: 消息队列,存储消息的队列,消息到达队列后推送给消费者。
Producer: 消息生产者,即生产方客户端,生产者将消息投递到消息队列。
Consumer: 消息消费者,即消费方客户端,接收消息队列推送的消息。
Connect: 可以理解为一个Tcp连接,Producer和Consumer都是通过TCP连接到RabbitMQ Server的;
Channel: 虚拟连接,它建立在上述的Tcp连接中,数据流动都是在Channel中进行的,也就是说,一般情况是程序开始建立Tcp连接,第二步就是建立Channel。
接下来将会详细介绍Rabbitmq各个组成部分、消息投放确认机制、消息路由机制、消息存储机制、消息消费确认机制、消息消费推送机制。
生产者(Producer)和消费者(Consumer):
Producer和Consumer消息生者和发送者,下面用示例程序来说明他们。
1、创建一个简单的Maven工程;
2、引入Rabbitmq的Maven依赖;
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.5</version>
</dependency>
</dependencies>
3、编写Producer和Consumer代码;
Producer代码:
public static void main(String[] args) {
// 连结工厂,可以理解为连结工厂完成与Rabbitmq服务器连结池的创建。
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置虚拟主机名,默认是/,虚拟主机可理解为java语言的包名,其作用就是分域。
connectionFactory.setVirtualHost("test");
// 设置Rabbitmq服务器的IP,如果是默认的guest用户,只能从localhost去访问。
connectionFactory.setHost("200.159.253.98");
// 设置Rabbitmq服务器的Port,默认就是5672,如果就是5672可以不设置。
connectionFactory.setPort(5672);
// 设置Rabbitmq的登录用户名。
connectionFactory.setUsername("sd");
// 设置Rabbitmq的登录密码。
connectionFactory.setPassword("sd");
try {
// 获取连结。
Connection connection = connectionFactory.newConnection();
// 创建通道。
Channel channel = connection.createChannel(1);
//channel.basicQos(1);
// 创建队列,这个函数的调用是一个幂等性操作。
channel.queueDeclare("test_queue", false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", "test_queue", null, message.getBytes());
System.out.println("生产者 send: " + message);
// 关闭通道。
channel.close();
// 关闭连结。
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
return;
}
Consumer代码:
public static void main(String[] args) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("sd");
connectionFactory.setPassword("sd");
connectionFactory.setVirtualHost("test");
connectionFactory.setHost("200.159.253.98");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel(1);
channel.queueDeclare("test_queue", false, false, false, null);
//channel.basicQos(1);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// 这个方法仅是调用DefaultConsumer的空函数,什么也不做,可以去掉。
super.handleDelivery(consumerTag, envelope, properties, body);
String strRcv = new String(body, "UTF-8");
System.out.println("消费者 rcv: " + strRcv);
}
};
// 设置队列监听和推送消息回调。
channel.basicConsume("test_queue", true, consumer);
} catch (Exception ex) {
ex.printStackTrace();
}
return;
}
我们可以先启动生产者去投递一个消息,再启动消费者去消费,如果成功消费,则证明异步模式。
简述连结(Connect)、通道(Channel)、交换机(Exchange)、路由键(Routing Key)和队列(Queue):
客户端,也就是Producer和Consumer,通信时直接创建对象有Connect和Channel,Connect代表着一个Tcp长连结,Connect是从ConnectionFactory连结工厂获得,ConnectionFactory可以理解为Springmvc持久层的数据库连结池,连结池这个东西,本质是解决Tcp连结过程的耗时,因为Tcp连结时要三次握手,断掉时要4次挥手,在高并发情况下,还会频繁向操作系统申请和释放内核资源,说白了就是在程序不用时,Tcp连结已经帮你做好,你要用时直接拿去用就行。Channel网上一些资料说是虚拟通道,但这种解释有点牵强,因为与Rabbitmq通信传输数据时用的就是Tcp,大家都知道网络传输的就是字节流,Channel本质仅是在协议层做到了一个连结可以处理多个消息队列,如果不这样做一个Tcp连结只能处理一个消息队列,这是一种流费,因为对于操作系统来说,建立和关闭Tcp连接是有代价的,频繁的建立关闭Tcp连接对于操作系统的性能有很大的影响,而且Tcp的连接数也有限制,这也限制了系统处理高并发的能力。上面这段话有点抽象可能不好理解,如果真不能理解,可以先不管,可以加qq群(907128466),在群里问我。现在只需理解,真正与Rabbitmq通信的是Channel,一个通道只能操作一个消息队列,一个Connect可以创建多个Channel。
后端,也就是(Rabbitmq Broker),交换机(Exchange)、路由键(Routing Key)、消息队列(Queue),如图:
交换机(Exchange)本质是接收消息和路由消息,根据自身的类型和路由键值(Routing Key),将消息路由到绑定队列,交换机不存储消息,队列才存储消息,如果路由到的队列不存在,消息将会被丢弃,不同类型交换机,路由策略不一样,且消息一定是先发到交换机,再路由到与之绑定的队列,这也是Rabbitmq的消息路由机制,交换机(Exchange)类型有如下几类:
直连交换机(Direct Exchange):
Rabbitmq默认交换机类型就是直连类型,直连类型的交换机路由规则很简单,它会把消息路由到那些Binding Key与Routing Key完全匹配的消息队列(Queue)中。如:
若绑定一个“rabbit”的路由键(Binding Key),那生产者投递消息时必需指定“Rabbit”的Routing Key才能被路由到对应绑定的队列。Rabbitmq每创建一个队列,如果不绑定到交换机,队列默认会绑定到Rabbitmq的默认直连交换机,绑定的Routing Key就是队列的名字,Rabbitmq的默认直连交换机名字是一个空字窜,直连交换机是一对一消息投递。