【发布时间】:2020-08-16 04:08:35
【问题描述】:
我正在开发一个使用 kafka 的 scala 应用程序。我的kafka消费者代码如下。
def getValues(topic: String): String = {
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val topicPartition = new TopicPartition(topic, 0)
consumer.assign(util.Collections.singletonList(topicPartition))
val offset = consumer.position(topicPartition) - 1
val record = consumer.poll(Duration.ofMillis(500)).asScala
for (data <- record)
if(data.offset() == offset) val value = data.value()
return value
}
在此我只想返回最新值。当我运行我的应用程序时,我得到以下日志:
Resetting offset for partition topic-0 to offset 0
因为 val offset = consumer.position(topicPartition) - 1 变为 -1 并且 data.offset() 给出了所有偏移量的列表。结果我没有得到最新的价值。为什么它会自动将偏移量重置为0?我该如何纠正?我的代码有什么错误?或任何其他方式我可以从最新的偏移量中获取值?
【问题讨论】:
-
您的主题通常包含多少元素?
标签: scala apache-kafka kafka-consumer-api