【问题标题】:KafkaConsumer Position() vs Committed()KafkaConsumer Position() vs Committed()
【发布时间】:2021-02-10 07:45:03
【问题描述】:
position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).

committed(TopicPartition partition): OffsetAndMetadata  

Get the last committed offset for the given partition (whether the commit happened by this process or another).

如果我需要使用特定消费者组的最新提交偏移量(用于 Spark Structured Streaming 的startingOffset),我应该使用什么。

我的代码显示已提交已弃用。

  val latestOffset = consumer.position(partition)
  val last=consumer.committed(partition)

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.4.1</version>
    </dependency>

官方文档:

偏移量和消费者位置 Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位于位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。实际上有两个与消费者的用户相关的位置概念: 消费者的位置给出了将要发出的下一条记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一。每次消费者在调用 poll(long) 中收到消息时,它都会自动前进。

已提交位置是已安全存储的最后一个偏移量。如果进程失败并重新启动,这是消费者将恢复到的偏移量。消费者可以定期自动提交偏移量;或者它可以选择通过调用其中一个提交 API(例如 commitSync 和 commitAsync)手动控制这个提交位置。

【问题讨论】:

    标签: scala apache-kafka kafka-consumer-api


    【解决方案1】:

    您需要使用 Spark Streaming Job 中的 committed 偏移量作为startingOffset。

    position API 的计数器在运行时由 KafkaConsumer 递增,并且可能与committed API 的结果略有不同,因为消费者可能会或可能不会提交偏移量,如果它提交了它可能会这样做异步。

    在 Kafka 2.4.1 中,方法 committed(partition) 已被弃用,建议使用 newer API,它采用 Set 的 TopicPartitions。它的签名是:

    public Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions)
    

    当您使用 Scala 时,需要将您的 Scala 集转换为 Java 集。这可以按照here 的描述来完成。

    【讨论】:

    • 我们在 scala 中是否有任何关于使用 commited 以及如何从中获取 (partition->offset) 映射的示例
    • 示例:val last=consumer.committed(partitions.toSet) 如何提取分区图和偏移量 {"0"->200,"1"->100}
    • 它已经是一张地图了。因此,试图找出制作 scala 地图的最佳方法。 last.foreach(x=> println(x._1 +"->" +x._2.offset())) ..不确定。请帮助
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-05-14
    • 2019-09-15
    • 2018-05-12
    • 2020-01-29
    • 1970-01-01
    • 2021-09-23
    • 1970-01-01
    相关资源
    最近更新 更多