一、简述

二、示例demo

 

、简述

延时消息在日常随处可见:

1、订单创建10min之后不发起支付,自动取消。

2、30min定时推送一次邮件信息。

最常用到方式为定时任务轮训,数据量小的时候使用没什么问题 而当有千万甚至上亿的数据量时就会出现数据读取的瓶颈,此时全表扫面进行处理一定是下下策。但是也有比较讨巧的方式,分享公司内部订单拆分的例子:

由于线上每天订单量50万+的增长量,单表早已无法吃撑这个增长的速度。采取的方式为订单归档:线上热数据保留2-3天的数据,其余都归档进入历史订单表中,这样热数据在200万以内。
订单超过10min不支付即取消的功能,可以采取简单的扫表形式而不会出现数据读取性能的问题。

 这样的方式很简单,但需要跟业务进行沟通妥协,本文会讲另一种方式即RabbitMQ延迟队列。RabbitMQ实际并没有直接实现延时队列,但可利用RabbitMQ提供的属性来模拟延时队列,甚至已经有的配套的插件rabbitmq_delayed_message_exchange 下面先介绍使用到的RabbitMQ的属性。

1、消息的Time To Live (TTL) 

x-message-ttl:消息过期时间,超过过期时间之后即变为死信(Dead-letter)不会再被消费者消费。

设置消息TTL有两种方式:

  • 创建队列时指定x-message-ttl,此时队列所有的消息具有统一过期时间。
  • 发送消息为每个消息设置 expiration,此时消息之间过期时间不同。 

如果两者都设置,过期时间取两者最小。如果设置TTL为0即表示除非立马能发送到队列,否则直接丢弃该消息。利用TTL为0的特性再结合死信转发器可以替代RabbitMQ 3.0的immediate参数。

2、队列的TTL

x-expires: RabbitMQ会确保时间达到后将队列删除,但是并不保障这个动作有多及时。队列过期代表着处于未使用状态,即

  • 队列无任何消费者
  • 队列没有被重新声明
  • 队列在过期未调用Basic.Get命令获取消息

3、x-dead-letter-exchange(RabbitMQ文档):死信转发器(转发器类型)当消息达到过期时间未被消费则会由该exchange按照配置的x-dead-letter-routing-key转发到指定队列,最后被消费者消费,如果未配置x-dead-letter-routing-key则会按照原队列的key进行转发。

4、队列的消息在以下几种情况会变成死信(Dead-letter)

  • 设置的x-message-ttl或者expiration到期,即消息过期
  • 消息被消费者拒绝(调用Basic.Reject / Basic.Nack)且 requeue参数设置为false
  • 队列达到最大长度

 

二、示例demo

  • 单个延迟队列

RabbitMQ延时队列逻辑:

 

RabbitMQ 延时消息队列

 

  

1、exchange_delay_begin:缓冲队列exchange交换器,用于将消息转发至缓存消息队列 queue_delay_begin 。

2、exchange_delay_done:死信(dead-letter)队列exchange交换器,用于将队列 queue_delay_begin 转发到死信队列。

3、queue_delay_begin:缓冲消息队列,等待消息过期。

4、queue_delay_done:死信消息队列,消费者能够真正消费信息。

 spring-rabbitmq.xml :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
                            http://www.springframework.org/schema/context
                            http://www.springframework.org/schema/context/spring-context-3.1.xsd
                            http://www.springframework.org/schema/tx
                            http://www.springframework.org/schema/tx/spring-tx.xsd
                            http://www.springframework.org/schema/aop
                            http://www.springframework.org/schema/aop/spring-aop.xsd
                            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
                            http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
    <!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory />
    <!-- 延时队列 -->
    <rabbit:direct-exchange  >
        <rabbit:bindings>
            <rabbit:binding queue="queue_delay_begin" key="delay" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:queue name="queue_delay_begin" durable="false">
        <rabbit:queue-arguments>
            <!--  队列过期时间 -->
            <entry key="x-message-ttl" value="30000" value-type="java.lang.Long" />
            <entry key="x-dead-letter-exchange" value="exchange_delay_done" />
            <entry key="x-dead-letter-routing-key" value="delay" />
        </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:direct-exchange  >
        <rabbit:bindings>
            <rabbit:binding queue="queue_delay_done" key="delay" />
            <!--  binding key 相同为 【delay】exchange转发消息到多个队列 -->
            <!--<rabbit:binding queue="queue_delay_done_two" key="delay" />-->
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:queue name="queue_delay_done" durable="false"/>
    <rabbit:template  />
    <bean ></bean>

    <!-- 消息接收者 -->
    <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" >
        <rabbit:listener queues="queue_delay_done" ref="messageConsumer" />
    </rabbit:listener-container>
</beans>

 DelayMessageProducer.java

@Service
public class DelayMessageProducer {
    @Resource(name="delayMsgTwoTemplate")
    private AmqpTemplate delayMsgTwoTemplate;
    public void delayMsgTwo(String exchange, String routingKey, Object msg) {
        delayMsgTwoTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration(String.valueOf(10000));
                return message;
            }
        });
    }
}

 MessageConsumer.java

public class MessageConsumer implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println("consumer receive message 22------->:{}"+ message);
    }
}

 application.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
    <import resource="spring-rabbitmq.xml" />
    <!-- 扫描指定package注释的注册为Spring Beans -->
    <context:component-scan base-package="com.nancy.rabbitmq" />
    <!-- 激活annotation功能 -->
    <context:annotation-config />
    <!-- 激活annotation功能 -->
    <context:spring-configured />
</beans>

DelayQueueTest.java

public class DelayQueueTest {
    private ApplicationContext context = null;
    @org.junit.Before
    public void setUp() throws Exception {
        context = new ClassPathXmlApplicationContext("rabbitmq/application.xml");
    }
    @Test
    public void delayQueueTest() throws Exception {
        DelayMessageProducer messageProducer = context.getBean(DelayMessageProducer.class);
        int a = 10;
        while (a > 0) {
            System.out.println("send "+ a);
            messageProducer.delayMsgTwo("exchange_delay_begin","delay", "hello world delay2 :" + a--);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("sended ");
        Thread.sleep(1000*60);
    }
}

 运行结果: 发送消息 10s之后, 消费监听到消息 消费。

send 10
send 9
send 8
send 7
send 6
send 5
send 4
send 3
send 2
send 1
sended 
consumer receive message 22------->:{}(Body:'hello world delay2 :10' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :9' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :8' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :7' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :6' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :5' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :4' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=7, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :3' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=8, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :2' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=9, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
consumer receive message 22------->:{}(Body:'hello world delay2 :1' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=10, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
View Code

分类:

技术点:

相关文章: