【问题标题】:How to get consumer offset value of kafka如何获取kafka的消费者偏移值
【发布时间】:2018-01-19 13:22:15
【问题描述】:
我有producer,它正在不断地向一个主题发送消息。
假设我的consumer 消费了 1 到 10 条消息,在消费第 11 条消息之前它崩溃了,当它返回时,producer 产生了 100 条消息,假设现在消息是 110,我知道当 consumer 加入一个consumer group 它将获取最后提交的偏移量,因此它将重新开始从 11 读取,但我想使用 java 在日志中打印这些偏移量值并确保它不会错过任何消息
另外,我们如何在kafka 中获得主题明智的 TTL
【问题讨论】:
标签:
java
apache-kafka
kafka-consumer-api
【解决方案1】:
从ConsumerRecord可以得到一个topic对应的所有元数据
kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
String kafkaMessages = null
try{
while(true){
ConsumerRecords kafkaRecords
kafkaRecords = kafkaConsumer.poll(100)
for(ConsumerRecord record: kafkaRecords){
partition = record.partition()
offset = record.offset()
topicName = record.topic()
Object message = record.value()
}