这很有趣。
编辑:根据最近的评论,对此进行更新。更新的部分被划线并明确标记或以斜体标记。
我会选择 "No" "Yes" 即最后返回的消息的偏移量将 NOT 一次又一次地提交,如果没有新的消息已到达主题。
这里有同样的解释。
典型的消费者示例如下所示:
Properties props = new Properties();
<other-properties>
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
因此,偏移提交的责任通常由消费者承担,并由轮询循环驱动。
现在,在您描述的最后一次提交之后的场景中,每次调用 poll() 方法都会返回一个空映射。 所以,如果poll() 没有返回记录,那么就没有要提交的新偏移量。
以下是我如何追溯 Kafka 的源代码并得出这个结论的。以下return语句来自poll()给定的here方法定义
return ConsumerRecords.empty();
此file 中可用的empty() 方法的定义。
编辑:以下部分是根据 Gwen 的评论新增的内容。
然而,在返回空映射之前,通过KafkaConsumer 类的poll() 方法调用另一个poll() 方法(位于ConsumerCoordinator 类中),根据给定@987654323 的定义@。如果通过以下方法启用定期偏移提交,则处理它们:
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
我希望这会有所帮助!