【问题标题】:How to consume only latest offset in Kafka topic如何仅使用 Kafka 主题中的最新偏移量
【发布时间】:2020-08-16 04:08:35
【问题描述】:

我正在开发一个使用 kafka 的 scala 应用程序。我的kafka消费者代码如下。

def getValues(topic: String): String  = {
        
  val props = new Properties()
  props.put("group.id", "test")
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("auto.offset.reset", "earliest")
  val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

  val topicPartition = new TopicPartition(topic, 0)
  consumer.assign(util.Collections.singletonList(topicPartition))
  val offset = consumer.position(topicPartition) - 1
  val record = consumer.poll(Duration.ofMillis(500)).asScala
  for (data <- record)
    if(data.offset() == offset) val value = data.value()
  return value
}

在此我只想返回最新值。当我运行我的应用程序时,我得到以下日志:

 Resetting offset for partition topic-0 to offset 0

因为 val offset = consumer.position(topicPartition) - 1 变为 -1 并且 data.offset() 给出了所有偏移量的列表。结果我没有得到最新的价值。为什么它会自动将偏移量重置为0?我该如何纠正?我的代码有什么错误?或任何其他方式我可以从最新的偏移量中获取值?

【问题讨论】:

  • 您的主题通常包含多少元素?

标签: scala apache-kafka kafka-consumer-api


【解决方案1】:

您正在寻找 seek 方法 - 根据 JavaDocs - “覆盖消费者将在下一次轮询(超时)时使用的获取偏移量”。

还要确保您正在设置

props.put("auto.offset.reset", "latest")

对您的代码进行这两项修改后,以下内容对我来说仅获取所选主题中部分0 的最新偏移量的value

import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._

def getValues(topic: String): String  = {
    val props = new Properties()
    props.put("group.id", "test")
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

    val topicPartition = new TopicPartition(topic, 0)
    consumer.assign(java.util.Collections.singletonList(topicPartition))
    val offset = consumer.position(topicPartition) - 1
    consumer.seek(topicPartition, offset)
    val record = consumer.poll(Duration.ofMillis(500)).asScala
    for (data <- record) {
      val value: String = data.value() // you are only reading one message if no new messages flow into the Kafka topic
    }
    value
}

【讨论】:

  • 这只有在主题只有一个分区时才有效。
  • OP故意将主题分区定义为TopicPartition(topic, 0),所以@C4stor这里应该不是问题?
  • 无论如何,好点,我会在我的回答中说清楚。感谢您指出!
  • 好吧,我不知道^^,因为OP代码一开始就不起作用,我不知道哪些部分是错误,哪些部分是假设^^
【解决方案2】:

在这一行,props.put("auto.offset.reset", "earliest"),您将 Kafka 消费者的参数 auto.offset.reset 设置为 earliest,这会将偏移量重置为最早。如果你想要最新的值,你应该使用latest。 你可以找到文档here

【讨论】:

  • 但是最新的我可以得到我在消费者启动时发送的值。我已经有 kafka 主题的数据。我只需要消耗最新的一个。我该怎么做?
猜你喜欢
  • 2016-05-27
  • 2017-12-12
  • 1970-01-01
  • 2020-08-04
  • 1970-01-01
  • 2021-01-18
  • 2018-01-23
  • 2016-11-20
  • 2020-05-20
相关资源
最近更新 更多