前面介绍了消息的发送,这节主要介绍消息的存储。这里只关注普通消息,事务消息在后面介绍。
MQ存储方式
一般有分布式kv、文件系统、DB等,不同的MQ根据设计的不同有各自的选择,RocketMQ同kafka一样,选择了文件系统作为存储方式,在存储设计上借鉴了kafka。
RocketMQ与kafka对比
kafka将消息用topic+partition分割成不同的文件,在topic和partition数量较多的情况下,server端文件的顺序读写会变成随机读写,性能严重下降。RocketMQ则将所有topic的消息全部存储在一个文件中,这样,对于发消息的写文件操作来说,是完全的顺序写,性能不会因为topic数量产生影响。但是对于消费消息的读文件操作来说,则变为了完全随机的读操作,并且存储文件中会存在大量不属于本topic的消息。为了解决这个问题,RocketMQ在原始存储文件的基础上,为每一个topic建立了消息索引文件,消费消息时先读索引文件,再根据索引位置读取原始文件中的消息。这样,消息读取就变为顺序的了,不过却额外增加了一次文件读操作。
存储结构
存储结构如下:
- CommitLog为原始的存储文件
- ConsumeQueue为每个队列对应的消费索引文件
- IndexFile 为消息的索引文件
消息发送
producer发送消息时,broker将消息保存到CommitLog中,然后将CommitLog的内容dispatch到对应topic的ConsumeQueue中,即上面说的,所有topic消息保存在一个CommitLog中,每个topic有自己单独的ConsumeQueue。IndexFile作为消息的索引文件,同理。
消息消费
consumer消费消息时,先从对应topic的ConsumeQueue中读取消息的偏移量,并对消息做初步过滤。之后从CommitLog中读取原始消息。
存储性能分析
这里简单说下RocketMQ用到的page cache和Mmap。
page cache
page cache的详细内容请自行google。简单来说就是对于Linux系统所有的文件IO请求,操作系统都是通过page cache机制实现的,磁盘文件都是有一系列固定大小的数据块(4K,8K等)组成的。page cache从磁盘读取文件时,会进行预读取,即读入请求页面和紧随其后的几个页面,从而提高page cache命中率。RocketMQ的ConsumeQueue文件保存数据较少,并且是顺序读取,在page cache的加持下ConsumeQueue的读取性能会比较高,可以看做近乎于内存。
Mmap
关于Mmap可以看这篇文章Mmap。RocketMQ主要通过MappedByteBuffer(使用Mmap)对文件进行读写,将对文件的操作转化为直接对内存进行操作,从而极大提高了文件的读写效率。
存储层次设计
存储从上往下一共分为5个层次,如图
- 业务处理层:主要作用是broker发送、消费、查询消息的业务处理,提供最上层的功能。
- 数据文件存储组件层:主要是核心存储类DefaultMessageStore,通过该类完成对mq消息的读取和写入。
- 存储逻辑对象层:该层主要包含了RocketMQ数据文件存储直接相关的三个模型类IndexFile、ConsumerQueue和CommitLog。CommitLog存储原始消息,ConsumerQueue存储消费队列数据,IndexFile为索引存储。
- 文件内存映射对象层:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel两种方式完成数据文件的读写。其中,采用MappedByteBuffer这种内存映射磁盘文件的方式完成对大文件的读写,在RocketMQ中将该类封装成MappedFile类。
- 磁盘存储层:主要指的是部署RocketMQ服务器所用的磁盘。
后面按照分层进行介绍。
业务处理层
业务处理层包含三个对象QueryMessageProcessor、SendMessageProcessor、PullMessageProcessor,分别进行分析。
QueryMessageProcessor
查询消息,包含两种请求的处理,queryMessage() 和 viewMessageById(),具体逻辑委托给了DefaultMessageStore进行操作,后面针对DefaultMessageStore详细分析。
SendMessageProcessor
处理发送消息,对于事务消息,委托给TransactionalMessageServiceImpl 处理,非事务消息委托给DefaultMessageStore处理。
PullMessageProcessor
拉取消息,委托给DefaultMessageStore处理。