【发布时间】:2020-04-27 14:23:37
【问题描述】:
我正在尝试从 Kafka 主题中删除特定消息或记录。我知道卡夫卡不是为了做到这一点而建立的。但是是否可以使用主题压缩,并能够使用特定的 Kafka 密钥将记录替换为空记录?如何做到这一点?
谢谢
【问题讨论】:
-
嗨@CMPE,下面的回答是否回答了您的问题?
标签: apache-kafka
我正在尝试从 Kafka 主题中删除特定消息或记录。我知道卡夫卡不是为了做到这一点而建立的。但是是否可以使用主题压缩,并能够使用特定的 Kafka 密钥将记录替换为空记录?如何做到这一点?
谢谢
【问题讨论】:
标签: apache-kafka
是的,如果您有一个紧凑的主题,您可以删除特定的消息。
在这种情况下,您的消息密钥将成为标识符。如果您想删除特定消息,则需要向主题发送具有相同键和空值的消息。这称为tombstone 消息。 Kafka 会将这个墓碑保留一段可配置的时间(以便您的消费者可以处理删除)。在这个设定的时间之后,cleaner 线程将删除 tombstone 消息,并且密钥将从 Kafka 的分区中消失。
一般来说,请注意,旧的(要删除的)消息不会立即消失。根据配置,可能需要一些时间才能替换单个消息。
我发现这个配置摘要很有帮助 (link to blog)
1) 激活压缩清理策略
cleanup.policy=compact应该放置2) 只要消费者在小于主题配置
delete.retention.ms的时间段内到达日志头部(默认为 24 小时),消费者就会看到所有墓碑。3)这些线程的数量可以通过
log.cleaner.threadsconfig配置4) 清理线程然后选择脏率最高的日志。
dirty ratio = the number of bytes in the head / total number of bytes in the log(tail + head)5) 主题配置
min.compaction.lag.ms用于保证在压缩消息之前必须经过的最短时间。6) 要设置延迟以在写入后开始压缩记录,请使用主题配置
log.cleaner.min.compaction.lag.ms。直到这段时间之后,记录才会被压缩。该设置让消费者有时间获取每条记录。
log compaction 被介绍为
日志压缩确保 Kafka 在单个主题分区的数据日志中始终保留每个消息键的至少最后一个已知值。
它的保证列在here:
日志压缩由日志清理器处理,这是一个后台线程池,用于重新复制日志段文件,删除其键出现在日志头部的记录。每个 compactor 线程的工作方式如下:
1) 选择日志头与日志尾比例最高的日志
2) 它为日志头部的每个键创建最后偏移量的简洁摘要
3) 它从头到尾重新复制日志,删除日志中稍后出现的键。新的、干净的段会立即交换到日志中,因此所需的额外磁盘空间只是一个额外的日志段(不是日志的完整副本)。
4)日志头的摘要本质上只是一个空间紧凑的哈希表。每个条目正好使用 24 个字节。因此,使用 8GB 的清理缓冲区,一次清理迭代可以清理大约 366GB 的日志头(假设有 1k 条消息)。
【讨论】: