array(2) { ["docs"]=> array(10) { [0]=> array(10) { ["id"]=> string(3) "428" ["text"]=> string(77) "Visual Studio 2017 单独启动MSDN帮助(Microsoft Help Viewer)的方法" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(8) "DonetRen" ["tagsname"]=> string(55) "Visual Studio 2017|MSDN帮助|C#程序|.NET|Help Viewer" ["tagsid"]=> string(23) "[401,402,403,"300",404]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511400964" ["_id"]=> string(3) "428" } [1]=> array(10) { ["id"]=> string(3) "427" ["text"]=> string(42) "npm -v;报错 cannot find module "wrapp"" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(4) "zzty" ["tagsname"]=> string(50) "node.js|npm|cannot find module "wrapp“|node" ["tagsid"]=> string(19) "[398,"239",399,400]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511400760" ["_id"]=> string(3) "427" } [2]=> array(10) { ["id"]=> string(3) "426" ["text"]=> string(54) "说说css中pt、px、em、rem都扮演了什么角色" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(12) "zhengqiaoyin" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511400640" ["_id"]=> string(3) "426" } [3]=> array(10) { ["id"]=> string(3) "425" ["text"]=> string(83) "深入学习JS执行--创建执行上下文(变量对象,作用域链,this)" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(7) "Ry-yuan" ["tagsname"]=> string(33) "Javascript|Javascript执行过程" ["tagsid"]=> string(13) "["169","191"]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511399901" ["_id"]=> string(3) "425" } [4]=> array(10) { ["id"]=> string(3) "424" ["text"]=> string(30) "C# 排序技术研究与对比" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(9) "vveiliang" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(8) ".Net Dev" ["catesid"]=> string(5) "[199]" ["createtime"]=> string(10) "1511399150" ["_id"]=> string(3) "424" } [5]=> array(10) { ["id"]=> string(3) "423" ["text"]=> string(72) "【算法】小白的算法笔记:快速排序算法的编码和优化" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(9) "penghuwan" ["tagsname"]=> string(6) "算法" ["tagsid"]=> string(7) "["344"]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511398109" ["_id"]=> string(3) "423" } [6]=> array(10) { ["id"]=> string(3) "422" ["text"]=> string(64) "JavaScript数据可视化编程学习(二)Flotr2,雷达图" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(7) "chengxs" ["tagsname"]=> string(28) "数据可视化|前端学习" ["tagsid"]=> string(9) "[396,397]" ["catesname"]=> string(18) "前端基本知识" ["catesid"]=> string(5) "[198]" ["createtime"]=> string(10) "1511397800" ["_id"]=> string(3) "422" } [7]=> array(10) { ["id"]=> string(3) "421" ["text"]=> string(36) "C#表达式目录树(Expression)" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(4) "wwym" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(4) ".NET" ["catesid"]=> string(7) "["119"]" ["createtime"]=> string(10) "1511397474" ["_id"]=> string(3) "421" } [8]=> array(10) { ["id"]=> string(3) "420" ["text"]=> string(47) "数据结构 队列_队列实例:事件处理" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(7) "idreamo" ["tagsname"]=> string(40) "C语言|数据结构|队列|事件处理" ["tagsid"]=> string(23) "["246","247","248",395]" ["catesname"]=> string(12) "数据结构" ["catesid"]=> string(7) "["133"]" ["createtime"]=> string(10) "1511397279" ["_id"]=> string(3) "420" } [9]=> array(10) { ["id"]=> string(3) "419" ["text"]=> string(47) "久等了,博客园官方Android客户端发布" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(3) "cmt" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511396549" ["_id"]=> string(3) "419" } } ["count"]=> int(200) } 222 apache kafka系列之源码分析走读-kafka内部模块分析 - 爱码网

转载至:http://blog.csdn.net/lizhitao/article/details/37911993


kafka整体结构分析:

kafka源代码工程目录结构如下图:

apache kafka系列之源码分析走读-kafka内部模块分析

下面只对core目录结构作说明,其他都是测试类或Java客户端代码


admin   --管理员模块,操作和管理topic,paritions相关,包含create,delete topic,扩展 patitions

Api       --该模块主要负责交互数据的组装,客户端与服务端交互数据编解码

client    --该模块比较简单,就一个类,Producer读取kafka broker元数据信息,

topic和partitions,以及leader

cluster   --该模块包含几个实体类,Broker,Cluster,Partition,Replica,解释他们之间关系: Cluster由多个broker组成,一个Broker包含多个partition,一个topic的所有

partitions分布在不同broker的中,一个Replica包含多个Partition。

common     --通用模块,只包含异常类和错误验证

consumer    --consumer处理模块,负责所有客户端消费者数据和逻辑处理

contoroller  --负责中央控制器选举,partition的leader选举,副本分配,副本重新分配,

partition和replica扩容。

javaapi   --提供java的producer和consumer接口api

log          --Kafka文件存储模块,负责读写所有kafka的topic消息数据。

message    --封装多个消息组成一个“消息集”或压缩消息集。

metrics    --内部状态的监控模块

network        --网络事件处理模块,负责处理和接收客户端连接

producer        --producer实现模块,包括同步和异步发送消息。

serializer        --序列化或反序列化当前消息

kafka         --kafka门面入口类,副本管理,topic配置管理,leader选举实现(由contoroller模块调用)。

tools           --一看这就是工具模块,包含内容比较多:

a.导出对应consumer的offset值.

b.导出LogSegments信息,当前topic的log写的位置信息.

c.导出zk上所有consumer的offset值.

d.修改注册在zk的consumer的offset值.

f.producer和consumer的使用例子.

utils   --Json工具类,Zkutils工具类,Utils创建线程工具类,KafkaScheduler公共调度器类,公共日志类等等。



1.kafka启动类:kafka.Scala

kafka为kafka broker的main启动类,其主要作用为加载配置,启动report服务(内部状态的监控),注册释放资源的钩子,以及门面入口类。

kafka类代码如下:

......

 try {
      val props = Utils.loadProps(args(0))          //加载配置文件
      val serverConfig = new KafkaConfig(props)
      KafkaMetricsReporter.startReporters(serverConfig.props)             //启动report服务(内部状态的监控)
      val kafkaServerStartble = new KafkaServerStartable(serverConfig)    //kafka server核心入口类
      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {                 //钩子程序,当jvm退出前,销毁所有资源
        override def run() = {
          kafkaServerStartble.shutdown
        }
      })


      kafkaServerStartble.startup
      kafkaServerStartble.awaitShutdown
    }

......

KafkaServerStartble类包装了KafkaSever类,其实啥都没有做。只是调用包装类而已

KafkaSever类是kafka broker运行控制的核心入口类,它是采用门面模式设计的。

apache kafka系列之源码分析走读-kafka内部模块分析


kafka中KafkaServer类,采用门面模式,是网络处理,io处理等得入口.

ReplicaManager    副本管理

KafkaApis    处理所有request的Proxy类,根据requestKey决定调⽤用具体的handler

KafkaRequestHandlerPool 处理request的线程池,请求处理池  <-- num.io.threads io线程数量

LogManager    kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据

TopicConfigManager  监听此zk节点的⼦子节点/config/changes/,通过LogManager更新topic的配置信息,topic粒度配置管理,具体请查看topic级别配置

KafkaHealthcheck 监听zk session expire,在zk上创建broker信息,便于其他broker和consumer获取其信息

KafkaController  kafka集群中央控制器选举,leader选举,副本分配。

KafkaScheduler  负责副本管理和日志管理调度等等

ZkClient         负责注册zk相关信息.

BrokerTopicStats  topic信息统计和监控

ControllerStats          中央控制器统计和监控


KafkaServer部分主要代码如下:

[java] view plain copy
 apache kafka系列之源码分析走读-kafka内部模块分析apache kafka系列之源码分析走读-kafka内部模块分析
  1. ......    
  2. def startup() {  
  3.     info("starting")  
  4.     isShuttingDown = new AtomicBoolean(false)  
  5.     shutdownLatch = new CountDownLatch(1)  
  6.   
  7.     /* start scheduler */  
  8.     kafkaScheduler.startup()  
  9.       
  10.     /* setup zookeeper */  
  11.     zkClient = initZk()  
  12.   
  13.     /* start log manager */  
  14.     logManager = createLogManager(zkClient)  
  15.     logManager.startup()  
  16.   
  17.     socketServer = new SocketServer(config.brokerId,  
  18.                                     config.hostName,  
  19.                                     config.port,  
  20.                                     config.numNetworkThreads,  
  21.                                     config.queuedMaxRequests,  
  22.                                     config.socketSendBufferBytes,  
  23.                                     config.socketReceiveBufferBytes,  
  24.                                     config.socketRequestMaxBytes)  
  25.     socketServer.startup()  
  26.   
  27.     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)  
  28.     kafkaController = new KafkaController(config, zkClient)  
  29.       
  30.     /* start processing requests */  
  31.     apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)  
  32.     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)  
  33.      
  34.     Mx4jLoader.maybeLoad()  
  35.   
  36.     replicaManager.startup()  
  37.   
  38.     kafkaController.startup()  
  39.       
  40.     topicConfigManager = new TopicConfigManager(zkClient, logManager)  
  41.     topicConfigManager.startup()  
  42.       
  43.     /* tell everyone we are alive */  
  44.     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)  
  45.     kafkaHealthcheck.startup()  
  46.   
  47.       
  48.     registerStats()  
  49.     startupComplete.set(true);  
  50.     info("started")  
  51.   }  
  52.     
  53.   private def initZk(): ZkClient = {  
  54.     info("Connecting to zookeeper on " + config.zkConnect)  
  55.     val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)  
  56.     ZkUtils.setupCommonPaths(zkClient)  
  57.     zkClient  
  58.   }  
  59.   
  60.   /** 
  61.    *  Forces some dynamic jmx beans to be registered on server startup. 
  62.    */  
  63.   private def registerStats() {  
  64.     BrokerTopicStats.getBrokerAllTopicsStats()  
  65.     ControllerStats.uncleanLeaderElectionRate  
  66.     ControllerStats.leaderElectionTimer  
  67.   }  
  68. .......  
apache kafka系列之源码分析走读-kafka内部模块分析


相关文章: