【发布时间】:2021-02-10 07:45:03
【问题描述】:
position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).
committed(TopicPartition partition): OffsetAndMetadata
Get the last committed offset for the given partition (whether the commit happened by this process or another).
如果我需要使用特定消费者组的最新提交偏移量(用于 Spark Structured Streaming 的startingOffset),我应该使用什么。
我的代码显示已提交已弃用。
val latestOffset = consumer.position(partition)
val last=consumer.committed(partition)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
官方文档:
偏移量和消费者位置 Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。实际上有两个与消费者的用户相关的位置概念: 消费者的位置给出了将要发出的下一条记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一。每次消费者在调用 poll(long) 中收到消息时,它都会自动前进。
已提交位置是已安全存储的最后一个偏移量。如果进程失败并重新启动,这是消费者将恢复到的偏移量。消费者可以定期自动提交偏移量;或者它可以选择通过调用其中一个提交 API(例如 commitSync 和 commitAsync)手动控制这个提交位置。
【问题讨论】:
标签: scala apache-kafka kafka-consumer-api