Howlet

之前也学习过消息队列,但一直没有使用的场景,今天项目中遇到了 kafka 那便有了应用场景


1. Kafka

Kafka 是一个分布式、支持分区,多副本的基于 zookeeper 的消息队列。使用消息队列,是应用 A 将要处理的信息发送到消息队列然后继续下面的任务,需要该信息的应用 B 从消息队列里面获取信息再做处理,这样做像是多此一举,应用 A 直接发信息给应用 B 不就可以了吗?存在即合理,使用消息队列其作用如下:


  • 异步处理:用户注册后发送邮件、短信、验证码等可以异步处理,使注册这个过程写入数据库后就可立即返回
  • 流量消峰:秒杀活动超过阈值的请求丢弃转向错误页面,然后根据消息队列的消息做业务处理
  • 日志处理:可以将error的日志单独给消息队列进行持久化处理
  • 应用解耦:购物的下单操作,订单系统与库存系统中间加消息队列,使二者解耦,若后者故障也不会导致消息丢失

之前 笔者也写过 RabbitMQ 的笔记,传送门







2. 生产消费模型

结合 kafka 的下面这些名词来解释其模型会更加容易理解

名称 解释
Broker kafka 的实例,部署多台 kafka 就是有多个 broker
Topic 消息订阅的话题,是这些消息的分类,类似于消息订阅的频道
Producer 生产者,负责往 kafka 发送消息
Consumer 消费者,从 kafka 读取消息来进行消费







3. 安装部署

kafka 和依赖的 zookeeper 是 java 编写的工具,其需要 jdk8 及其以上。笔者这里使用 Docker 安装,偷懒了贪图方便快捷

# 使用 wurstmeister 制作的镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka


# 启动 zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper


# 单机启动 kafka
docker run  -d --name kafka -p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=xxx.xxx.xxx.xxx:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx.xxx.xxx.xxx:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka






4. Quickstart

kafka 官网也有很好的介绍,quickstart

# 进入kafka容器
docker exec -it kafka /bin/sh


# 进入 bin 目录
cd /opt/kafka_2.13-2.8.1/bin


# partitions 分区
# replication 副本因子
# 创建一个主题(参数不懂可直接填写,后面会讲解说明)
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092


# 查看
./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092


# 写入 topic(回车表示一条消息,ctrl + c 结束输入)
# 消息默认存储 7 天,下一步的消费可以验证
./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event


# 读取 topic(运行多次可以读取消息,因为默认存储 7 天)
./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092






5. SpringBoot 集成

SpringBoot 集成了 Kafka,添加依赖后可使用内置的 KafkaTemplate 模板方法来操作 kafka 消息队列


5.1 添加依赖

<!--  sprinboot版本管理中有kafka可不写版本号  -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>


5.2 配置文件

server:
  port: 8080

spring:
  # 消息队列
  kafka:
    producer:
      # broker地址,重试次数,确认接收个数,消息的编解码方式
      bootstrap-servers: 101.200.197.22:9092
      retries: 3
      acks: 1
      key-serializer: org.springframework.kafka.support.serializer.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.StringSerializer
    consumer:
      # broker地址,自动提交,分区offset设置
      bootstrap-servers: 101.200.197.22:9092
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


5.3 生产者

@RestController
@RequestMapping("/kafka")
public class Producer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/producer1")
    public String sendMessage1(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
        SendResult<String, Object> sendResult = future.get();
        return sendResult.toString();
    }

    @GetMapping("/producer2")
    public String sendMessage2(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("faile");
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("success");
            }
        });
        return "";
    }
}


5.4 消费者

@Component
public class Consumer {

    @KafkaListener(topics = {"topic1"})
    public void onMessage(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}






6. 存储目录结构

kafka
|____kafka-logs
    |____topic1
    |	  |____00000000000000000000.log(存储接收的消息)
    |	  |____consumer_offsets-01(消费者偏移量)
    |	  |____consumer_offsets-02
    |____topic2
    	  |____00000000000000000000.log
    	  |____consumer_offsets-01
    	  |____consumer_offsets-02

每台 broker 实例接收到消息后将之存储到 00000.log 里面,保存的方式是先入先出。消息被消费后不会被删除,相反可以设置 topic 的消息保留时间,重要的是 Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是完全没问题的


消费者会将自己消费偏移量 offset 提交给 topic 在 _consumer_offsets 里面保存,然后通过偏移量来确定消息的位置,默认从上次消费的位置开始,添加参数 --frombeginning 则从头开始消费,可获取之前所有存储的消息。kafka 也会定期清除内部的消息,直到保存最新的一条(文件保存的消息默认保存 7 天)







7. 消费组

这个在笔者配置消费者的时候发现的问题,启动时报错说没有指定消费组


  • 每条分区消息只能被同组的一个消费者消费,consumer1 和 consumer2 同组,所以只有其中一个能消费同条消息
  • 每条分区消息能被不同组的单个消费者消费,consumer2 和 consumer4 不同组,所以都能消费同条消息
  • 以上二个规则同时成立
  • 其作用是可以保证消费顺序,同个分区里的消息会被同个消费者顺序消费






8. 分区和副本

topic 消息保存的文件 0000.log 可以进行物理切分,这就是分区的概念,类似于数据库的分库分表。这样做的好处在于单个保存的文件不会太大从而影响性能,最重要的是分区后不是单个文件串行执行了,而是多区多文件可并行执行提高了并发能力




分区:消费者会消费同一 topic 的不同分区,所以会保存不同分区的偏移量,其格式为:GroupId + topic + 分区号

副本:副本是对分区的备份,集群中不同的分区在不同的 broker 上,但副本会对该分区备份到指定数量的 broker 上,这些副本有 leader 和 follower 的区别,leader负责读写,挂了再重新选举,副本为了保持数据一致性







9. 常见问题


9.1 生产者同步和异步消息

生产者发送消息给 broker,之后 broker 会响应 ack 给生产者,生产者等待接收 ack 信号 3 秒,超时则重试 3 次

生产者 ack 确认配置:

  • ack = 0:不需要同步消息
  • ack = 1:则 leader 收到消息,并保存到本地 log 之后才响应 ack 信息
  • ack 默认配置为 2


9.2 消费者自动提交和手动提交

  • 自动提交:消费者 pull 消息之后马上将自身的偏移量提交到 broker 中,这个过程是自动的
  • 手动提交:消费者 pull 消息时或之后,在代码里将偏移量提交到 broker
  • 二者区别:防止消费者 pull 消息之后挂掉,在消息还没消费但又提交了偏移量


9.3 消息丢失和重复消费

  • 消息丢失
    • 生产者:配置 ack ,以及配置副本和分区数值一致
    • 消费者:设置手动提交
  • 重复消费
    • 设置唯一主键,Mysql 主键唯一则插入失败
    • 分布式锁


9.4 顺序消费方案

  • 生产者:关闭重试,使用同步发送,成功了再发下一条
  • 消费者:消息发送到一个分区中,只有一个消费组的消费者能接收消息


相关文章: