【问题标题】:How to get consumer offset value of kafka如何获取kafka的消费者偏移值
【发布时间】:2018-01-19 13:22:15
【问题描述】:

我有producer,它正在不断地向一个主题发送消息。

假设我的consumer 消费了 1 到 10 条消息,在消费第 11 条消息之前它崩溃了,当它返回时,producer 产生了 100 条消息,假设现在消息是 110,我知道当 consumer 加入一个consumer group 它将获取最后提交的偏移量,因此它将重新开始从 11 读取,但我想使用 java 在日志中打印这些偏移量值并确保它不会错过任何消息

另外,我们如何在kafka 中获得主题明智的 TTL

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    从ConsumerRecord可以得到一个topic对应的所有元数据

    kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
                String kafkaMessages = null
                try{
                    while(true){
                       ConsumerRecords kafkaRecords
                       kafkaRecords = kafkaConsumer.poll(100)
                         for(ConsumerRecord record: kafkaRecords){
                           partition = record.partition()
                           offset = record.offset()
                           topicName = record.topic()
                             Object message = record.value()
                           }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-03-11
      • 1970-01-01
      • 2017-07-22
      • 1970-01-01
      • 2019-04-04
      • 2016-12-06
      • 1970-01-01
      • 2020-06-11
      相关资源
      最近更新 更多