【发布时间】:2020-11-18 06:12:23
【问题描述】:
我正在开发一个使用 kafka 的应用程序,而技术是 scala。我的kafka消费者代码如下:
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(topic))
val record = consumer.poll(Duration.ofMillis(500)).asScala.toList
它给了我所有的记录,但问题是我已经在 kafka 消费者中有数据,这可能导致重复数据意味着具有相同键的数据已经存在于主题中。有什么方法可以让我从特定时间检索数据。如果我可以计算当前时间并仅检索那些在该时间之后出现的记录,则意味着在轮询之前。我有什么办法可以做到这一点?
【问题讨论】:
-
您是否在寻找latest 选项-
props.put("auto.offset.reset", "latest")? -
no latest 对我没有用,因为我已经有主题中的数据
-
我已经在 kafka 消费者中有数据你能详细说明一下这个问题吗?一旦
consumer group中的consumer阅读了消息Kafka将commit消息并且应该阅读only once by consumer。你是怎么得到重复的? -
我可能有重复的键。我的制作人每次都会使用相同的密钥发送数据。我正在使用已经有数据的主题中的数据。我只是订阅它并阅读数据
-
@PrathapReddy 有有效的用例用于多次读取/处理
标签: scala apache-kafka kafka-consumer-api