array(2) { ["docs"]=> array(10) { [0]=> array(10) { ["id"]=> string(3) "428" ["text"]=> string(77) "Visual Studio 2017 单独启动MSDN帮助(Microsoft Help Viewer)的方法" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(8) "DonetRen" ["tagsname"]=> string(55) "Visual Studio 2017|MSDN帮助|C#程序|.NET|Help Viewer" ["tagsid"]=> string(23) "[401,402,403,"300",404]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511400964" ["_id"]=> string(3) "428" } [1]=> array(10) { ["id"]=> string(3) "427" ["text"]=> string(42) "npm -v;报错 cannot find module "wrapp"" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(4) "zzty" ["tagsname"]=> string(50) "node.js|npm|cannot find module "wrapp“|node" ["tagsid"]=> string(19) "[398,"239",399,400]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511400760" ["_id"]=> string(3) "427" } [2]=> array(10) { ["id"]=> string(3) "426" ["text"]=> string(54) "说说css中pt、px、em、rem都扮演了什么角色" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(12) "zhengqiaoyin" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511400640" ["_id"]=> string(3) "426" } [3]=> array(10) { ["id"]=> string(3) "425" ["text"]=> string(83) "深入学习JS执行--创建执行上下文(变量对象,作用域链,this)" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(7) "Ry-yuan" ["tagsname"]=> string(33) "Javascript|Javascript执行过程" ["tagsid"]=> string(13) "["169","191"]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511399901" ["_id"]=> string(3) "425" } [4]=> array(10) { ["id"]=> string(3) "424" ["text"]=> string(30) "C# 排序技术研究与对比" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(9) "vveiliang" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(8) ".Net Dev" ["catesid"]=> string(5) "[199]" ["createtime"]=> string(10) "1511399150" ["_id"]=> string(3) "424" } [5]=> array(10) { ["id"]=> string(3) "423" ["text"]=> string(72) "【算法】小白的算法笔记:快速排序算法的编码和优化" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(9) "penghuwan" ["tagsname"]=> string(6) "算法" ["tagsid"]=> string(7) "["344"]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511398109" ["_id"]=> string(3) "423" } [6]=> array(10) { ["id"]=> string(3) "422" ["text"]=> string(64) "JavaScript数据可视化编程学习(二)Flotr2,雷达图" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(7) "chengxs" ["tagsname"]=> string(28) "数据可视化|前端学习" ["tagsid"]=> string(9) "[396,397]" ["catesname"]=> string(18) "前端基本知识" ["catesid"]=> string(5) "[198]" ["createtime"]=> string(10) "1511397800" ["_id"]=> string(3) "422" } [7]=> array(10) { ["id"]=> string(3) "421" ["text"]=> string(36) "C#表达式目录树(Expression)" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(4) "wwym" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(4) ".NET" ["catesid"]=> string(7) "["119"]" ["createtime"]=> string(10) "1511397474" ["_id"]=> string(3) "421" } [8]=> array(10) { ["id"]=> string(3) "420" ["text"]=> string(47) "数据结构 队列_队列实例:事件处理" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(7) "idreamo" ["tagsname"]=> string(40) "C语言|数据结构|队列|事件处理" ["tagsid"]=> string(23) "["246","247","248",395]" ["catesname"]=> string(12) "数据结构" ["catesid"]=> string(7) "["133"]" ["createtime"]=> string(10) "1511397279" ["_id"]=> string(3) "420" } [9]=> array(10) { ["id"]=> string(3) "419" ["text"]=> string(47) "久等了,博客园官方Android客户端发布" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(3) "cmt" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511396549" ["_id"]=> string(3) "419" } } ["count"]=> int(200) } 222 消息队列RabbitMQ(三):消息确认机制 - 爱码网
sang-bit

引言

RabbitMQ的模型是生产者发送信息到 Broker (代理),消费者从 Broker 中取出信息。但是生产者怎么知道消息是否真的发送到 Broker 中了呢?Broker 又怎么知道消息到底有没有被消费者消费?

如果由于网络原因出现故障,生产者生产的消息未到达 Broker 或者 Broker 的消息被虚假消费,而它们又不知道,就会产生很严重的问题,如重复消费等。

RabbitMQ的消息确认流程

image-20210519110039225

从图中可以看出:

消息确认机制分为生产者确认和消费者确认

  • ConfirmCallback 生产者
  • ReturnCallback 生产者
  • ACK 消费者

生产者确认

  • 消息到达RabbitMQ的Exchange:Exchange向生产者发送Confirm确认。成功抑或失败都会返回一个confirmCallback
  • 消息成功达到Exchange,但是从Exchange投递Queue失败:向生产者返回一个returnCallback。只有失败才会返回

消费者确认

  • 消费者收到消息后需要对 RabbitMQ Server 进行消息 ACK 确认,RabbitMQ 根据确认信息决定是删除队列中的该信息还是重新发送

代码实现

生产者确认

重点在于生产者重写下面两个方法

  • rabbitMQTemplate.setConfirmCallback

  • rabbitMQTemplate.setReturnCallback

  1. 开启生产者消息确认

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        virtual-host: /
        username: root
        password: root
        #    开启两个模式的生产者消息确认
        publisher-confirm-type: simple
        publisher-returns: true
    
  2. 声明交换机、队列,绑定交换机和队列

    @Configuration
    public class RabbitMQConfig {
    
        private static final String SB_TOPIC_EXCHANGE="sb_topic_exchange";
        private static final String SB_TOPIC_QUEUE="sb_topic_queue1";
    
        // 注入交换机 topic类型
        @Bean("topicExchange")
        public Exchange topicExchange(){
            return ExchangeBuilder.topicExchange(SB_TOPIC_EXCHANGE).durable(true)
                    .autoDelete().build();
        }
        // 声明队列
        @Bean
        public Queue queue1(){
            return QueueBuilder.durable(SB_TOPIC_QUEUE).build();
        }
    
        // 绑定队列和交换机
        @Bean
        public Binding exchangQueue(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("user.#").noargs();
        }
    }
    
  3. 创建消费者

    @Component
    @RabbitListener(queues = "sb_topic_queue1")
    public class Consumer {
    
        @RabbitHandler
        public void testPublishConfirm(String msg) {
            System.out.println("收到的信息:"+msg);
        }
    }
    
  4. 创建生产者

    创建生产者发送消息到消息队列,模拟两种异常情况

    @SpringBootTest
    class RabiitmqSpringbootApplicationTests {
    
        @Autowired
        RabbitTemplate template;
    
        @Test
        void testConfirmTrue() {
            // 设置confirm回调函数
            template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
                    if (b) System.out.println("消息发送成功");
                    else System.out.println("消息发送失败");
                }
    
            });
            // 模拟生产者发送信息--正常情况
            template.convertAndSend("sb_topic_exchange","user.info","日志级别:info;日志模块:user;日志信息:*****");
        }
    
        @Test
        void testConfirmFalse() {
            // 设置confirm回调函数
            template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) {
                    if (b) System.out.println("消息发送成功");
                    else System.out.println("消息发送失败");
                }
    
            });
            // 模拟生产者发送信息
            // 不存在的交换机--异常情况
         template.convertAndSend("sb_topic_exchange_noexist","user.info","日志级别:info;日志模块:user;日志信息:*****");
        }
    
        @Test
        void testReturnFalse() {
            // 设置return回调函数
            template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int i, java.lang.String s, java.lang.String s1, java.lang.String s2) {
                    System.out.println(message.toString());
                    System.out.println(s+"*********");
                }
    
            });
            template.setMandatory(true);
            // 模拟生产者发送信息
            // 正确的交换机 错误的routekey -- 异常情况
         template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
        }
    }
    

消费者确认

重点在于消费者的下面两个方法

  • channel.basicAck 消费者签收
  • channel.basicNAck 消费者拒绝签收
  1. 开启消费者确认模式

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        virtual-host: /
        username: root
        password: root
    #    设置消费端手动签收
        listener:
          direct:
            acknowledge-mode: manual
          simple:
            acknowledge-mode: manual
    
  2. 创建消费者

    /**
     * 注入消费者--手动签到
     */
    @Component
    @RabbitListener(queues = "sb_topic_queue1")
    public class Consumer2 {
    
        @RabbitHandler
        public void testComsumer(String msg, Channel channel, Message message) throws InterruptedException, IOException {
            // 消费端设置手动签收代码
            try {
                System.out.println(msg);
                // 正常签收,mq收到此消息被正常签收后即可从队列中删除vi信息
                // 是哟了那个channel的方法
                // 第一个参数是deliverytag 标识哪条信息 第二个参数是是否批量签收
                // int i=2/0; 模拟异常
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
                System.out.println("消费者签收了该信息,服务器你可以删了");
            }catch (Exception e){
                // 异常拒绝签收,让mq重发此信息
                System.out.println("该信息丢了,给我重发");
           channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
               // 该信息丢了,但是不需要你重发
         // channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
            }
        }
    }
    
    
  3. 创建生产者

    @SpringBootTest
    class RabiitmqSpringbootApplicationTests {
    
        @Autowired
        RabbitTemplate template;
    
        @Test
        void testConsumerAck() {
         template.convertAndSend("sb_topic_exchange","noexist.user.info","日志级别:info;日志模块:user;日志信息:*****");
        }
    }
    

参考

分类:

技术点:

相关文章:

  • 2017-11-21
  • 2020-04-22
  • 2021-11-28
  • 2018-04-28
  • 2021-10-27
  • 2018-08-26
  • 2018-03-20
  • 2018-08-14
猜你喜欢
  • 2019-05-21
  • 2021-10-27
  • 2021-10-27
  • 2019-03-21
  • 2018-11-14
  • 2020-01-17
  • 2021-10-27
相关资源
相似解决方案