1、Rabbitmq是什么?用来干嘛?

大家好,我们今天一起来学习Rabbitmq,学习一门技术之前,需要明白,这门技术用来干什么?能解决什么问题?Rabbitmq是基于Erlang语言开发的一款消息中间件,市场上有很多消息中间件,如:ActiveMQ、Kafka、RocketMQ等…,它们的优缺点,暂不评论,各家都说各家好。这里要强调一点,Erlang设计之初就是用来解决高并发,利用多核Cpu编程。

可能很多朋友不知道消息中间件,我个人理解消息中间件就是接收消息、存储消息、投递消息,我们使用消息中间件,很多时候就是在使用消息中间件的异步处理模式。那在什么业务场景下,我们会使用消息中间件呢?首先我来抛出三个问题:

  1. 同步串行处理耗时;
  2. 并发流量削峰;
  3. 程序设计解耦;

同步串行处理耗时:

软件设计开发中典型例子,就是用户注册成功后邮箱通知,同步rpc远程调用邮箱服务是一个耗时操作,如图:

Rabbitmq的通用使用详解

图中描述用户注册串行处理的过程,总耗时 =  注册请求到达业务服务器的耗时(1) + 业务服务器处理注册逻辑的耗时(2) + 业务服务器请求到达邮件服务器的耗时(3) + 邮件服务器处理发送邮件的耗时(4)  + 邮件服务器应答到业务服务器的耗时(5) + 业务服务器应答到用户的耗时(6),用户使用程序,串行处理时,需要接受6步总耗时,其实用户使用程序仅需1、2、6的耗时,这是最好的体验。那有没有办法解决呢?有,那就是使用Rabbitmq消息中间件,将邮件发送异步处理,如图:

Rabbitmq的通用使用详解

从图中我们可以看出,用户注册模块(上半部分)和邮件发送模块(下半部分),Rabbitmq将用户注册与邮件发送异步分开了,用户注册成功后,给Rabbitmq发一个消,说某人注册成功了,你收到消息后,请马上发邮件告知一下,然后立即返回,然后Rabbitmq会异步通知消息消费服务器去发送邮件,请记住业务服务器发送消息到Rabbbitmq,和Rabbitmq推送消息到消息消费服务器是异步的

我这里说一下,什么是同步和异步,打个比方有条河,河下游有一个人距离你200米,你需要拿一样东西给他。此时你有两种选择,一种是直接游过去给他,这叫同步。另一种直接将东西绑到一块木头上漂过去给他,这叫异步。同一样都是把东西给到下游的人,第一种游过去给他,则是耗时耗力。第二种仅是多耗绑东西的时间(可不计),然后你就可以去干其它事情,木头你可以理解为Rabbitmq。现在我们再来看看,用户注册总耗时 = 注册请求到达业务服务器的耗时(1) + 业务服务器处理注册逻辑的耗时(2) + 投递消息到Rabbitmq的耗时(3、4和5) + 业务服务器应答到用户的耗时(6),其中投递消息到Rabbitmq的耗时很短,可忽略不计(tcp通信且网络通道已建立)。这样我们就用Rabbitmq异步消息通知机制解决了串行问题。

并发流量削峰:

经典例子就是秒杀活动,阿里双11秒杀,短时间上亿的用户涌入,瞬间流量巨大(高并发),比如:200万人准备在凌晨12:00准备抢购一件商品,但是商品的数量是有限的100~500件左右。这样能购买到该件商品的用户也只有几百人左右, 但是从业务上来说,秒杀活动是希望更多的人来参与,也就是抢购之前希望有越来越多的人来看购买商品。下面我画图简要说明此过程,重在理解,真实场景肯定需要更多容错处理:

Rabbitmq的通用使用详解

注意: 图中黑线部分与红线部分是异步操作。

  1. 客户端黑线(1)请求到达服务器,进入秒杀请求处理服务器(2)的逻辑,首先通过商品ID在Redis判断商品是否存在,不存在黑线(5)应答秒杀失败到客户端,存在再判断商品是否已秒杀完,若完黑线(5)应答秒杀失败到客户端,若未完黑线(3)向Rabbitmq投递消息,消息入队列后黑线(4)应答秒杀请求处理服务器,黑线(5)再应答客户端秒杀成功。
  2. Rabbitmq红线(1)推送消息到秒杀处理服务器,然后进入秒杀处理服务器(2)的逻辑,主要是订单生成,库存减1等…,然后红线(3)应答Rabbitmq(关于消息推送后是否应答,后面会论,现在暂时这样理解)。

注意:这里一定要记住,Rabbitmq消息的生产投递和消息的推送消费是异步的,其次客户端应该是有支付逻辑的,简单一点,就是收到秒杀成功后,就定时向秒杀处理服务器要订单支付消息,获取成功后就去支付。

程序设计解耦:

其实同步串行处理耗时和并发流量削峰,就是得益于Rabbitmq的快速消息投递和异步消息推送处理,解耦的本质就是程序生产消息,只需知道这个消息用来干嘛就行,不用管消费者怎么去实现。如:生产者发送消息说,你往xxxx.com给我发送一封邮件,生产者只需要把消息推送到Rabbitmq就行,生产者不用管消费者怎么发,用那家公司邮件服务去发。

2、Rabbitmq的基本架构:

Rabbitmq的通用使用详解

组成部分简要说明:

Broker:  Rabbitmq服务进程,主要包括两个部分Exchange(交换机)和Queue(队列);

Exchange:  交换机,按一定的规则将消息路由到某个队列,不同类型的交换机有不同的路由规则;

Queue:  消息队列,存储消息的队列,消息到达队列后推送给消费者。

Producer:  消息生产者,即生产方客户端,生产者将消息投递到消息队列。

Consumer:  消息消费者,即消费方客户端,接收消息队列推送的消息。

Connect:  可以理解为一个Tcp连接,Producer和Consumer都是通过TCP连接到RabbitMQ Server的;

Channel:  虚拟连接,它建立在上述的Tcp连接中,数据流动都是在Channel中进行的,也就是说,一般情况是程序开始建立Tcp连接,第二步就是建立Channel。

接下来将会详细介绍Rabbitmq各个组成部分、消息投放确认机制、消息路由机制、消息存储机制、消息消费确认机制、消息消费推送机制。

生产者(Producer)和消费者(Consumer):

Producer和Consumer消息生者和发送者,下面用示例程序来说明他们。

1、创建一个简单的Maven工程;

2、引入Rabbitmq的Maven依赖;

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.3.5</version>
        </dependency>
    </dependencies>

3、编写Producer和Consumer代码;

Producer代码:

public static void main(String[] args) {

        // 连结工厂,可以理解为连结工厂完成与Rabbitmq服务器连结池的创建。
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 设置虚拟主机名,默认是/,虚拟主机可理解为java语言的包名,其作用就是分域。
        connectionFactory.setVirtualHost("test");
        // 设置Rabbitmq服务器的IP,如果是默认的guest用户,只能从localhost去访问。
        connectionFactory.setHost("200.159.253.98");
        // 设置Rabbitmq服务器的Port,默认就是5672,如果就是5672可以不设置。
        connectionFactory.setPort(5672);

        // 设置Rabbitmq的登录用户名。
        connectionFactory.setUsername("sd");
        // 设置Rabbitmq的登录密码。
        connectionFactory.setPassword("sd");

        try {
            // 获取连结。
            Connection connection = connectionFactory.newConnection();

            // 创建通道。
            Channel channel = connection.createChannel(1);
            
            //channel.basicQos(1);

            // 创建队列,这个函数的调用是一个幂等性操作。
            channel.queueDeclare("test_queue", false, false, false, null);

            String message = "Hello World!";

            channel.basicPublish("", "test_queue", null, message.getBytes());

            System.out.println("生产者 send: " + message);

            // 关闭通道。
            channel.close();
            // 关闭连结。
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return;
    }

Consumer代码:

public static void main(String[] args) {

        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();

            connectionFactory.setUsername("sd");
            connectionFactory.setPassword("sd");
            connectionFactory.setVirtualHost("test");
            connectionFactory.setHost("200.159.253.98");

            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel(1);

            channel.queueDeclare("test_queue", false, false, false, null);
            
            //channel.basicQos(1);

            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {

                    // 这个方法仅是调用DefaultConsumer的空函数,什么也不做,可以去掉。
                    super.handleDelivery(consumerTag, envelope, properties, body);

                    String strRcv = new String(body, "UTF-8");
                    System.out.println("消费者  rcv: " + strRcv);
                }
            };

            // 设置队列监听和推送消息回调。
            channel.basicConsume("test_queue", true, consumer);

        } catch (Exception ex) {
            ex.printStackTrace();
        }

        return;
    }

我们可以先启动生产者去投递一个消息,再启动消费者去消费,如果成功消费,则证明异步模式。

简述连结(Connect)、通道(Channel)、交换机(Exchange)、路由键(Routing Key)和队列(Queue):

客户端,也就是Producer和Consumer,通信时直接创建对象有Connect和Channel,Connect代表着一个Tcp长连结,Connect是从ConnectionFactory连结工厂获得,ConnectionFactory可以理解为Springmvc持久层的数据库连结池,连结池这个东西,本质是解决Tcp连结过程的耗时,因为Tcp连结时要三次握手,断掉时要4次挥手,在高并发情况下,还会频繁向操作系统申请和释放内核资源,说白了就是在程序不用时,Tcp连结已经帮你做好,你要用时直接拿去用就行。Channel网上一些资料说是虚拟通道,但这种解释有点牵强,因为与Rabbitmq通信传输数据时用的就是Tcp,大家都知道网络传输的就是字节流,Channel本质仅是在协议层做到了一个连结可以处理多个消息队列,如果不这样做一个Tcp连结只能处理一个消息队列,这是一种流费,因为对于操作系统来说,建立和关闭Tcp连接是有代价的,频繁的建立关闭Tcp连接对于操作系统的性能有很大的影响,而且Tcp的连接数也有限制,这也限制了系统处理高并发的能力。上面这段话有点抽象可能不好理解,如果真不能理解,可以先不管,可以加qq群(907128466),在群里问我。现在只需理解,真正与Rabbitmq通信的是Channel,一个通道只能操作一个消息队列,一个Connect可以创建多个Channel。

后端,也就是(Rabbitmq Broker),交换机(Exchange)、路由键(Routing Key)、消息队列(Queue),如图:

Rabbitmq的通用使用详解

交换机(Exchange)本质是接收消息和路由消息,根据自身的类型和路由键值(Routing Key),将消息路由到绑定队列,交换机不存储消息,队列才存储消息,如果路由到的队列不存在,消息将会被丢弃,不同类型交换机,路由策略不一样,且消息一定是先发到交换机,再路由到与之绑定的队列,这也是Rabbitmq的消息路由机制,交换机(Exchange)类型有如下几类:

直连交换机(Direct Exchange):

Rabbitmq默认交换机类型就是直连类型,直连类型的交换机路由规则很简单,它会把消息路由到那些Binding Key与Routing Key完全匹配的消息队列(Queue)中。如:

若绑定一个“rabbit”的路由键(Binding Key),那生产者投递消息时必需指定“Rabbit”的Routing Key才能被路由到对应绑定的队列。Rabbitmq每创建一个队列,如果不绑定到交换机,队列默认会绑定到Rabbitmq的默认直连交换机,绑定的Routing Key就是队列的名字,Rabbitmq的默认直连交换机名字是一个空字窜,直连交换机是一对一消息投递。

相关文章:

  • 2021-11-20
  • 2022-12-23
  • 2022-12-23
  • 2021-05-03
猜你喜欢
  • 2022-12-23
  • 2021-11-20
  • 2022-12-23
相关资源
相似解决方案