1、topic参数覆盖问题

Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值,这就是所谓的 Topic 级别参数。

2、topic数据全局、局部留存

在实际生产环境中,如果为所有 Topic 的数据都保存相当长的时间,这样做既不高效也无必要。更适当的做法是允许不同部门的 Topic 根据自身业务需要,设置自己的留存时间。如果只能设置全局 Broker 参数,那么势必要提取所有业务留存时间的最大值作为全局参数值,此时设置 Topic 级别参数把它覆盖,就是一个不错的选择。

3、JVM xms=6GB

将你的 JVM 堆大小设置成 6GB 吧,这是目前业界比较公认的一个合理值

4、JDK 版本

另外 Kafka 自 2.0.0 版本开始,已经正式摒弃对 Java 7 的支持了,所以有条件的话至少使用 Java 8 吧。

5、kafka-configs脚本趋势

并且在未来,Kafka 社区很有可能统一使用kafka-configs脚本来调整 Topic 级别参数。

6、GC选择

很多人使用默认的 Heap Size 来跑 Kafka,说实话默认的 1GB 有点小,毕竟 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例,Heap Size 不能太小。JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7,那么可以根据以下法则选择合适的垃圾回收器:如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。当然了,如果你已经在使用 Java 8 了,那么就用默认的 G1 收集器就好了。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等,所以使用 G1 就好了。

更改:Java8默认GC是Parallel GC,Java9默认才是G1
G1是jdk9中默认的,jdk8还是需要显式指定的

在启动 Kafka Broker 之前,先设置上这两个环境变量:

$> export KAFKA_HEAP_OPTS=–Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties

7、ulimit -n

ulimit -n。我觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响

8、Page Cache

向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了

随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。

9、不同情况下message的最大字节数

修改 Topic 级 max.message.bytes,还要考虑以下两个吧?
还要修改 Broker的 replica.fetch.max.bytes 保证复制正常
消费还要修改配置 fetch.message.max.bytes

message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起

replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。

fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。

10、分区策略

Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略。

11、kafka两大消息格式

目前 Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。

对两个版本分别做了一个简单的测试,结果显示,在相同条件下,不论是否启用压缩,V2 版本都比 V1 版本节省磁盘空间。当启用压缩时,这种节省空间的效果更加明显

12、compression&decompression

在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。

compression.type在生产端以及broker端都有

其实大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但这里的“大部分情况”也是要满足一定条件的。有两种例外情况就可能让 Broker 重新压缩消息。
情况一:Broker 端指定了和 Producer 端不同的压缩算法。
情况二:Broker 端发生了消息格式转换。

通常来说解压缩发生在消费者程序中

那么现在问题来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。如果用一句话总结一下压缩和解压缩,那么我希望你记住这句话:Producer 端压缩、Broker 端保持、Consumer 端解压缩。

除了在 Consumer 端解压缩,Broker 端也会进行解压缩

  • 消息格式转换时发生的解压缩
  • 做消息校验而引入的解压缩

13、压缩算法

在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。它是 Facebook 开源的一个压缩算法,能够提供超高的压缩比(compression ratio)

看一个压缩算法的优劣,有两个重要的指标:一个指标是压缩比,另一个指标就是压缩 / 解压缩吞吐量
Kafka极客杂谈——摘自胡夕
对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。

Zstandard 算法(简写为 zstd)

14、kafka丢消息?

Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
Kafka 是能做到不丢失消息的,只不过这些消息必须是已提交的消息,而且还要满足一定的条件

15、producer.send(msg)

目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”

使用producer.send(msg)方式发送消息而造成消息丢失的原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等。

Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)

16、丢消息解决方案

1、不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
2、设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
3、设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
4、设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
5、设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
6、设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。(acks=all表示消息要写入所有ISR副本,但没要求ISR副本有多少个。min.insync.replicas做了这样的保证)
7、确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
8、确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。这对于单 Consumer 多线程处理的场景而言是至关重要的

其实,Kafka 还有一种特别隐秘的消息丢失场景:增加主题分区。当增加主题分区后,在某段“不凑巧”的时间间隔后,Producer 先于 Consumer 感知到新增加的分区,而 Consumer 设置的是“从最新位移处”开始读取消息,因此在 Consumer 感知到新分区前,Producer 发送的这些消息就全部“丢失”了,或者说 Consumer 无法读取到这些消息。严格来说这是 Kafka 设计上的一个小缺陷,你有什么解决的办法吗?

分类:

技术点:

相关文章: