【问题标题】:Kafka Consumer - Poll behaviour卡夫卡消费者 - 投票行为
【发布时间】:2016-10-22 22:33:25
【问题描述】:

关于 KafkaConsumer (>=0.9),我在尝试实施满足我需求的解决方案时遇到了一些严重问题。

假设我有一个函数,它只需要读取来自 kafka 主题的 n 条消息。

例如:getMsgs(5) --> 获取主题中的下 5 条 kafka 消息。

所以,我有一个看起来像这样的循环。使用实际正确的参数进行编辑。在这种情况下,消费者的 max.poll.records 参数设置为 1,因此实际循环只迭代了一次。不同的消费者(其中一些通过许多消息迭代)共享一个抽象的父亲(这个),这就是它被编码的原因。 numMss 部分是针对该消费者的临时部分。

for (boolean exit= false;!exit;)
{
   Records = consumer.poll(config.pollTime);
   for (Record r:records) 
   {
       processRecord(r); //do my things
       numMss++;
       if (numMss==maximum) //maximum=5
       {   
          exit=true;
          break;
       }
   }
}

考虑到这一点,问题在于 poll() 方法可能会收到超过 5 条消息。例如,如果它收到 10 条消息,我的代码将永远忘记其他 5 条消息,因为 Kafka 会认为它们已经被消费了。

我尝试提交偏移量但似乎不起作用:

    consumer.commitSync(Collections.singletonMap(partition,
    new OffsetAndMetadata(record.offset() + 1)));

即使使用偏移配置,每当我再次启动消费者时,它不会从第 6 条消息开始记住,我只想要 5 条消息),但是从 11 日 开始(因为第一次投票消耗了 10 条消息)。

是否有任何解决方案,或者也许(最肯定)我错过了什么?

提前致谢!!

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    您可以将max.poll.records 设置为您喜欢的任何数字,这样您最多可以在每次投票中获得那么多记录。

    对于您在此问题中陈述的用例,您不必自己明确提交偏移量。您可以将enable.auto.commit 设置为true 并将auto.offset.reset 设置为earliest 这样它就会在没有消费者group.id 时启动(换句话说,当您要开始从第一个分区读取时间)。一旦你有一个 group.id 和一些存储在 Kafka 中的消费者偏移量,如果你的 Kafka 消费者进程死亡,它将从最后提交的偏移量继续,因为这是默认行为,因为当消费者启动时,它会首先查找是否有任何提交的偏移量,如果是,将从最后提交的偏移量继续,auto.offset.reset 不会启动。

    【讨论】:

    • 多年后,仍然感谢这个答案。注意到我没有明确感谢你,所以......迟到总比没有好。非常感谢
    【解决方案2】:

    将 auto.offset.reset 属性设置为“最新”。然后尝试consume,你会从committed offset获取消费记录。

    或者你在 poll 之前使用 consumer.seek(TopicPartition, offset) api。

    【讨论】:

    • auto.offset.reset 应该是最早的,并且只有在没有消费者 group.id 时才会启动。没有组 ID 就无法存储偏移量。如果已经有一个消费者组 id auto.offset.reset 不会做任何事情,默认情况下消费者从最后提交的偏移量中选择。
    【解决方案3】:

    您是否通过将 enable.auto.commit 设置为 false 来禁用自动提交。如果要手动提交偏移量,则需要禁用它。如果没有下次调用 poll() 将自动提交您从之前 poll() 收到的消息的最新偏移量。

    【讨论】:

      【解决方案4】:

      从 Kafka 0.9 开始,auto.offset.reset 参数名称已更改;

      当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办(例如,因为该数据已被删除):

      earliest: automatically reset the offset to the earliest offset
      
      latest: automatically reset the offset to the latest offset
      
      none: throw exception to the consumer if no previous offset is found for the consumer's group
      
      anything else: throw exception to the consumer.
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-07-03
        • 2018-05-05
        • 2021-08-22
        • 1970-01-01
        • 1970-01-01
        • 2020-10-28
        相关资源
        最近更新 更多