看RocketMQ源码看了一段时间,自己总结了下学习RocketMQ源码顺序:

1、RocketMQ架构,就是先了解下Producer、NameSrv、Broker、Consumer的简单关系。

2、RocketMQ基础概念。我最初时候是直接看源码的,没有看MQ里一些基础类的概念,也不清楚这些类,比如BrokerData、QueueData、MessageQueue、brokerAddrTable、topicQueueTable这些类的作用,所以直接看源码时很难看懂方法的作用,所以最好先把common里简单的类、数据结构了解清楚。

3、RocketMQ的nameSrv,nameSrv里面只有9个类,但写的非常简介,可以先把nameSrv搞明白。

broker集群图示如下:

1、RocketMQ基础概念

上图表示一个broker集群,集群名为cluster-a,里面有两个brokerName,分别为broker-a、broker-b,broker-a又包含了一个主broker和一个从broker,其中主broker的brokerId为0,ip为10.10.10.10,从broker的brokerId为1,ip为10.10.10.11。

pay:正常情况下一台机器只有一个ip,上面只能部署一个broker实例,一主一从要用两台机器。而虚拟机有虚拟网卡,一个虚拟机上面可以部署一个broker,nameSrv可以和broker一台机器,也可以不是同一台。

broker、topic、queue关系:

1、RocketMQ基础概念

  • rocketmq-common包:通用的一些类,方法,数据结构等

brokerAddrTable:HashMap<String/* brokerName */, BrokerData>;表示每个brokerName下面的一主多从的broker信息

BrokerData:里面有cluster、brokerName、brokerAddrs属性,cluster表示对应集群,brokerName表示brokerName下的一主多从broker,HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs 表示哪个cluster集群下哪个brokerName下哪个brokerId对应的broker地址

topicQueueTable:HashMap<String/* topic */, List<QueueData>> topicQueueTable;表示每个topic下面的消息队列详情

QueueData:描述了会有多少消息队列MessageQueue实例。每个topic是跨多个broker的,QueueData表示一个topic在哪个broker上,有多少读、写队列,即对应的消息队列MessageQueue信息,比如topic1跨两个broker,那么queueData里面的brokerName属性这个topic在哪个broker上,readQueueNums表示这个topic在这个broker上的读队列readMessageQueue的数量,writeQueueNums为写队列writeMessageQueue的数量,perm属性表示读/写/读和写。List<QueueData>就表示一个topic在哪一个broker上面的具体信息。如下图,表示topic-A在brokerName为broker-a和broker-b上各有8个读写队列。1、RocketMQ基础概念

MessagaQueue:里面有topic、brokerName、queueId属性,表示哪个broker上的哪个topic的queueId为**的消息队列TopicRouteData:定义了topic路由的相关信息,即一个topic到哪些brokerAddr去找等等,用于网络传输。List<QueueData>表示存储在哪些broker,有多少读写/队列,List<BrokerData>表示对应的broker集合,由List<QueueData>根据RouteInfoManager#pickupTopicRouteData方法得到。HashMap<String/* brokerAddr */, List<String>/* Filter Server */>表示brokerAddr对应的filterServer地址。

TopicRouteData与属性QueueData,BrokerData关系如下图:

1、RocketMQ基础概念

ConfigManager:配置管理器的父类,各种配置的管理接口,提供配置文件,加载配置,持久化配置,提供编码,解码方法。编码解码具体实现方法由实现类自己实现,比如TopicConfigManager的decode方法,先根据json串得到TopicConfigSerializeWrapper对象,如果wrapper非空,则获取topicConfigTable以及dataVersion,来更新自己的属性。

TopicConfig:定义topic的配置项,包括topicName、读对列、写队列个数,默认都是16个

dataVersion:一般用于表识数据(配置)的版本,用来辅助判断是否需要更新数据(配置)等

TopicConfigSerializeWrapper:包括topicConfigTable和dataVersion,该类继承了抽象类RemotingSerializable,封装topic配置以及版本号,用于网络传输配置以及版本。

TopicList:这个类用于将topic列表进行序列化,用于网络传输。

ClusterInfo:保存cluster信息提供给网络传输。里面有HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable,HashMap<String/* brokerName */, BrokerData> brokerAddrTable,可以查到clusterName下所有broker等。

SubscriptionData:用来形容要订阅什么样的消息数据, 并不是真正的被订阅的数据

  • rocketmq-broker包:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。

TopicConfigManager:继承ConfigManager,用于管理Topic的配置,里面有topicConfigTable和当前配置的版本dataVersion

topicConfigTable:ConcurrentMap<String, TopicConfig> topicConfigTable,key是topicName,value是主题对应的配置

  • rocketmq-namesrv包:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。

NamesrvStartup:NamesrvStartup 是 NameServer模块的启动入口类,启动nameSrv并注册钩子。

KVConfigManager:是namesrvController指定的kvConfig配置文件,HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable,可以添加、修改、查询,持久化到内存。

NamesrvConfig:NamesrvConfig是NameServer的配置类,这个类放在了common模块中,但是几乎都是被NameServer模块引用,这里就当在NameServer模块中讲。

NamesrvController:NamesrvController是NameServer服务的核心类   1、nameSrv启动 2、定时调用RouteInfoManager#scanNotActiveBroker方法,关闭2分钟没有往nameSrv发送心跳的brokerchannel连接,broker发送心跳时会更新BrokerLiveInfo的lastUpdateTimestamp值,如果这个值在当前时间的两分钟以前,说明已经2分钟没有发送心跳,关闭该brokerchannel连接。3、定时打印KVConfigManager,通过KvConfigManager.printAllPeriodically方法。

RouteInfoManager:记录路由信息。每个broker启动的时候会向namesrv注册,RouteInfoManager记录了broker注册的相关信息,Producer发送消息的时候根据topic获取路由到broker的信息, Consumer根据topic到namesrv获取topic的路由到broker的信息。RouteInfoManager使用HashMap定义了HashMap<String/* topic */, List<QueueData>> topicQueueTable,HashMap<String/* brokerName */, BrokerData> brokerAddrTable,HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable,HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable,HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable。

 

BrokerLiveInfo:代表一个活的broker连接,由最后更新时间,一个连接channel,数据版本和Ha地址组成,Broker定时向namesrv注册并更新BrokerLiveInfo的时间戳。

  • rocketmq-client包:提供发送、接受消息的客户端API。

TopicPublishInfo:

  • rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议

RemotingCommand:该类封装了RocketMQ的网络通信协议,服务器与客户端通过传递 RemotingCommand 对象来进行交互,包含RequestCode、CommandCustomHeader、body,以broker注册topic信息到nameSrv为例,org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker方法,其中RequestCode可以为REGISTER_BROKER,表示注册broker信息。可以用RegisterBrokerRequestHeader实现CommandCustomHeader,RegisterBrokerRequestHeader自定义clusterName、brokerName、brokerId、brokerAddr、haServerAddr,用于指定往哪个broker上面发送,body转为json后的发送数据,比如broker注册topic到nameSrv时的RegisterBrokerBody,里面由topic的配置和filterSer。

  • rocketmq-store:消息、索引存储等

CommitLog:

MappedFile:

MappedFileQueue:

  • rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!
  • rocketmq-tools:命令行工具

 

 

相关文章: