本文主要介绍kafka的基础知识,文章较长。
Kafka:
kafka是什么:
Linked in公司开发(国外公司)。外表像一个消息中间件,但不仅仅是一个消息中间件。kafka支持消息的发布订阅(消息中间件功能),可进行流处理,在大数据领域可以看做是一个实时版的hadoop。与普通消息中间件区别:1.天生支持分布式,可以以集群方式运行。2.磁盘存储数据。3.可以进行流处理数据,极大增加了处理速度和数据量。与hadoop的区别:kafka与hadoop的关注点不同,hadoop可以定期计算大量的数据文件,kafka的实时性比较高,可以持续处理数据文件,所以kafka用来处理核心业务,hadoop用来进行数据分析。
kafka中的一些基本概念:
消息:数据单元,和其他消息中间件的消息概念一样。消息由字节数组 组成。
*:一组消息,为了提高效率,将消息组成一批 然后一次性提交。不过是否使用批次或者一批多少数据 需要权衡项目具体情况。因为需要考虑批次的数据量或者时间延迟等问题。
主题和分区:Kafka 里的消息用主题进行分类(主题好比数据库中的表),主题下有可以被分为若干个分区(分表技术)。分区本质上是个提交日志文件,有新消 息,这个消息就会以追加的方式写入分区(写文件的形式),然后用先入先出的顺序读取。每个分区里的数据按顺序存储,但一个主题下的数据无法区分顺序(同个主题会有多个分区,分区与分区之间无法区分顺序),因为同一个主题可以跨服务器,所以同一个主题下的分区可以在多个服务器上。
生产者和消费者、偏移量、消费者群组:和其他消息中间件一样,生产者就是生产消息的,消费者是使用消息的。生产者默认情况下把消息均衡分布到主题的所有分区上,如果需要指定分区,则需要使用消息里的消息键和分区器(消息键key:多个分区时,可以根据key来确定写入哪个分区。分区器:kafka中用来分配消息写入分区的东西)。
偏移量:一种元数据,不断递增的整数值,用来表示数据在kafka中的位置。消费者订阅一个或多个主题,并按顺序读取(使用偏移量判断读取到哪了)。
消费者群组:多个消费者可以构成一个消费者群组。怎么构成?共同读取一个主题的消费者们,就形成了一个群组。群组可以保证每个分区只被一个消费者使用。消费者和分区之间的这种映射关系叫做消费者对分区的所有权关系,很明显,一个分区只有一个消费者,而一个消费者可以有多个分区。
Broker和集群:一个Kafka服务器被称为一个Broker,多个服务器构成一个集群。单个 broker 可以处理上千个分区和每秒百万级的消息量(需要合适的硬件和操作系统、JVM等调优才行)。一个集群会有一主多从,以分区为单位,例如BrokerA和BrokerB,主题A有两个分区b和c,分区b的主在brokerA中,分区c的主可以在brokerB中。集群中 Kafka 内部一般使用管道技术进行高效的复制。
消息存储:
broker 默认的保留策略是:要么保留一段时间(7 天,7天之内没有新数据写入,保留时间从最后修改开始计时),要么保留一定大小(比如 1 个 G)。 到了限制,旧消息过期并删除。但是每个主题可以根据业务需求配置自己的保留策略。按照保留大小存储:一般配置中使用log.segment.bytes属性而不是使用log.retention.bytes。两者区别:log.retention.bytes每个分区文件最大限制,超过这个限制或超过保留时间(7天)后会自动删除。log.segment.bytes是将分区日志切分成日志片段,超过设置的大小后会创建一个新的片段,旧片段超过保留时间(7天)后会自动删除。
Kafka优点:
1.多生产者和多消费者
2.基于磁盘的数据存储,换句话说,Kafka 的数据天生就是持久化的。
3.高伸缩性,Kafka 一开始就被设计成一个具有灵活伸缩性的系统,对在线集群的伸缩丝毫不影响整体系统的可用性。
4.高性能,结合横向扩展生产者、消费者和 broker,Kafka 可以轻松处理巨大的信息流(LinkedIn 公司每天处理万亿级数据),同时保证亚秒级的消息 延迟。
使用场景:
活动跟踪(较多)
跟踪网站用户和前端应用发生的交互,比如页面访问次数和点击,将这些信息作为消息发布到一个或者多个主题上,这样就可以根据这些数据为机 器学习提供数据,更新搜素结果等等(头条、淘宝等总会推送你感兴趣的内容,其实在数据分析之前就已经做了活动跟踪)。
传递消息
标准消息中间件的功能
收集指标和日志(较多)
收集应用程序和系统的度量监控指标,或者收集应用日志信息,通过 Kafka 路由到专门的日志搜索系统,比如 ES。(国内用得较多)
提交日志
收集其他系统的变动日志,比如数据库。可以把数据库的更新发布到 Kafka 上,应用通过监控事件流来接收数据库的实时更新,或者通过事件流将数 据库的更新复制到远程系统。
还可以当其他系统发生了崩溃,通过重放日志来恢复系统的状态。(异地灾备)
流处理
操作实时数据流,进行统计、转换、复杂计算等等。随着大数据技术的不断发展和成熟,无论是传统企业还是互联网公司都已经不再满足于离线批 处理,实时流处理的需求和重要性日益增长 。 近年来业界一直在探索实时流计算引擎和 API,比如这几年火爆的SparkStreaming、KafkaStreaming、Beam 和 Flink,其中阿里双 11 会场展示的实时 销售金额,就用的是流计算,是基于 Flink,然后阿里在其上定制化的 Blink。
kafka基本操作:
列出所有主题 kafka-topics.bat–zookeeperlocalhost:2181–list
列出所有主题的详细信息 kafka-topics.bat–zookeeperlocalhost:2181–describe
创建主题 :主题名 my-topic,1 副本,8 分区 kafka-topics.bat–zookeeperlocalhost:2181–create–topicmy-topic–replication-factor1–partitions8
增加分区,注意:分区无法被删除 kafka-topics.bat–zookeeperlocalhost:2181–alter–topicmy-topic–partitions16
删除主题 kafka-topics.bat–zookeeperlocalhost:2181–delete–topicmy-topic
创建生产者(控制台) kafka-console-producer.bat–broker-listlocalhost:9092–topicmy-topic
创建消费者(控制台) kafka-console-consumer.bat–bootstrap-serverlocalhost:9092–topicmy-topic–from-beginning
列出消费者群组(仅 Linux) kafka-topics.sh–new-consumer–bootstrap-serverlocalhost:9092–list
#列出消费者群组详细信息(仅 Linux) kafka-topics.sh–new-consumer–bootstrap-serverlocalhost:9092–describe–group 群组名
Broker配置:
配置文件位于config 目录中,主要在server.properties中。
broker.id 在单机时无需修改,但在集群下部署时往往需要修改。它是个每一个 broker 在集群中的唯一表示,要求是正数。当该服务器的 IP 地址发生改变时, broker.id 没有变化,则不会影响 consumers 的消息情况
listeners 监听列表(以逗号分隔 不同的协议(如 plaintext,trace,ssl、不同的 IP 和端口)),hostname 如果设置为 0.0.0.0 则绑定所有的网卡地址;如果 hostname 为空 则绑定默认的网卡。如果 没有配置则默认为 java.net.InetAddress.getCanonicalHostName()。
如:PLAINTEXT://myhost:9092,TRACE://:9091 或 PLAINTEXT://0.0.0.0:9092, zookeeper.connect zookeeper 集群的地址,可以是多个,多个之间用逗号分割。(一组 hostname:port/path 列表,hostname 是 zk 的机器名或 IP、port 是 zk 的端口、/path 是可选 zk 的路径,如果不指定,默认使用根路径)
log.dirs Kafka 把所有的消息都保存在磁盘上,存放这些数据的目录通过 log.dirs 指定。可以使用多路径,使用逗号分隔。如果是多路径,Kafka 会根据“最少 使用”原则,把同一个分区的日志片段保存到同一路径下。会往拥有最少数据分区的路径新增分区。
num.recovery.threads.per.data.dir 每数据目录用于日志恢复启动和关闭时的线程数量。因为这些线程只是服务器启动(正常启动和崩溃后重启)和关闭时会用到。所以完全可以设置 大量的线程来达到并行操作的目的。注意,这个参数指的是每个日志目录的线程数,比如本参数设置为 8,而 log.dirs 设置为了三个路径,则总共会启动 24 个线程。 auto.create.topics.enable 是否允许自动创建主题。如果设为 true,那么 produce(生产者往主题写消息),consume(消费者从主题读消息)或者 fetchmetadata(任意客户端 向主题发送元数据请求时)一个不存在的主题时,就会自动创建。缺省为 true。
主题配置
新建主题的默认参数
num.partitions 每个新建主题的分区个数(分区个数只能增加,不能减少 )。这个参数一般要评估,比如,每秒钟要写入和读取 1000M 数据,如果现在每个消费者 每秒钟可以处理 50MB 的数据,那么需要 20 个分区,这样就可以让 20 个消费者同时读取这些分区,从而达到设计目标。(一般经验,把分区大小限制在 25G 之内比较理想)
log.retention.hours 日志保存时间,默认为 7 天(168 小时)。超过这个时间会清理数据。bytes 和 minutes 无论哪个先达到都会触发。与此类似还有 log.retention.minutes 和 log.retention.ms,都设置的话,优先使用具有最小值的那个。(提示:时间保留数据是通过检查磁盘上日志片段文件的最后修改时间来实现的。也就 是最后修改时间是指日志片段的关闭时间,也就是文件里最后一个消息的时间戳)
log.retention.bytes topic 每个分区的最大文件大小,一个 topic 的大小限制 = 分区数*log.retention.bytes。-1 没有大小限制。log.retention.bytes 和 log.retention.minutes 任意一个达到要求,都会执行删除。(注意如果是 log.retention.bytes 先达到了,则是删除多出来的部分数据),一般不推荐使用最大文件删除策略,而是推 荐使用文件过期删除策略。
log.segment.bytes 分区的日志存放在某个目录下诸多文件中,这些文件将分区的日志切分成一段一段的,我们称为日志片段。这个属性就是每个文件的最大尺寸;当 尺寸达到这个数值时,就会关闭当前文件,并创建新文件。被关闭的文件就开始等待过期。默认为 1G。 如果一个主题每天只接受 100MB 的消息,那么根据默认设置,需要 10 天才能填满一个文件。而且因为日志片段在关闭之前,消息是不会过期的,所 以如果 log.retention.hours 保持默认值的话,那么这个日志片段需要 17 天才过期。因为关闭日志片段需要 10 天,等待过期又需要 7 天。
log.segment.ms 作用和 log.segment.bytes 类似,只不过判断依据是时间。同样的,两个参数,以先到的为准。这个参数默认是不开启的。
message.max.bytes 表示一个服务器能够接收处理的消息的最大字节数,注意这个值 producer 和 consumer 必须设置一致,且不要大于 fetch.message.max.bytes 属性的值 (消费者能读取的最大消息,这个值应该大于或等于 message.max.bytes)。该值默认是 1000000 字节,大概 900KB~1MB。如果启动压缩,判断压缩后的值。 这个值的大小对性能影响很大,值越大,网络和 IO 的时间越长,还会增加磁盘写入的大小。 Kafka 设计的初衷是迅速处理短小的消息,一般 10K 大小的消息吞吐性能最好(LinkedIn 的 kafka 性能测试)。
硬件对性能影响:
磁盘吞吐量/磁盘容量 *:因为是基于硬盘存储,写操作当内存不够后会直接写入硬盘。所以硬盘影响很大。
内存:读操作为了效率所以从内存中读取,当内存小的情况下,写的数据由于内存不够会直接写入硬盘。所以读时还需要将硬盘的数据读到内存中。所以内存对性能影响大。
网络(**):网络吞吐量决定了能处理的数据流量。
CPU:对性能要求不是很高。