【问题标题】:When Kafka auto commit is enable, does same offset get committed multiple times?启用 Kafka 自动提交时,是否多次提交相同的偏移量?
【发布时间】:2018-06-10 14:56:02
【问题描述】:

让我们假设 enable.auto.commit=true,并假设我从中读取消息的主题有很长一段时间不活动(比方说 48 小时没有消息)。结果连续的 poll() 调用在 48 小时内不会返回任何消息,我的问题是:

是否会在 __consumer_offsets 主题中的每个 auto.commit.interval.ms 中一次又一次地提交最后返回消息的偏移量(48 小时相同),该主题已压缩并且谁的过期时间由 offsets.retention.minutes 控制?

一次又一次地提交会阻止 __consumer_offsets 主题中的记录过期并在某个时候被删除。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    这很有趣。

    编辑:根据最近的评论,对此进行更新。更新的部分被划线并明确标记或以斜体标记。

    我会选择 "No" "Yes" 即最后返回的消息的偏移量将 NOT 一次又一次地提交,如果没有新的消息已到达主题。

    这里有同样的解释。

    典型的消费者示例如下所示:

    Properties props = new Properties();
    <other-properties>
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    consumer.subscribe(Arrays.asList("foo", "bar"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
    }
    

    因此,偏移提交的责任通常由消费者承担,并由轮询循环驱动。

    现在,在您描述的最后一次提交之后的场景中,每次调用 poll() 方法都会返回一个空映射。 所以,如果poll() 没有返回记录,那么就没有要提交的新偏移量。

    以下是我如何追溯 Kafka 的源代码并得出这个结论的。以下return语句来自poll()给定的here方法定义

    return ConsumerRecords.empty();
    

    file 中可用的empty() 方法的定义。

    编辑:以下部分是根据 Gwen 的评论新增的内容。

    然而,在返回空映射之前,通过KafkaConsumer 类的poll() 方法调用另一个poll() 方法(位于ConsumerCoordinator 类中),根据给定@987654323 的定义@。如果通过以下方法启用定期偏移提交,则处理它们:

    public void maybeAutoCommitOffsetsAsync(long now) {
        if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
            this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
            doAutoCommitOffsetsAsync();
        }
    }
    

    我希望这会有所帮助!

    【讨论】:

    • 我认为这不能回答问题。答案是正确的,poll() 将返回空。但是 - 位置会发生什么(返回的最新偏移量)?它不会改变,因为我们需要在下一次获取尝试时再次使用它。并且自动提交将再次提交相同的偏移量。查看 ConsumerCoordinator.doAutoCommitOffsetsAsync() 并查看它始终提交当前位置。
    • @GwenShapira - 非常感谢您的评论/建议。 :) 阅读完这篇文章后,我回顾了源代码并意识到我完全错过了 poll() 方法中的 pollOnce() 调用,该方法正在调用 ConsumerCoordinator 中的 poll() 来发挥作用。无论如何,我已经将答案从“否”更新为“是”并添加了更多解释。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-01-17
    • 1970-01-01
    • 2021-02-22
    • 1970-01-01
    • 2021-11-14
    • 1970-01-01
    相关资源
    最近更新 更多