【问题标题】:How to get Kafka messages based on timestamp如何根据时间戳获取Kafka消息
【发布时间】:2020-11-18 06:12:23
【问题描述】:

我正在开发一个使用 kafka 的应用程序,而技术是 scala。我的kafka消费者代码如下:

val props = new Properties()
        props.put("group.id", "test")
        props.put("bootstrap.servers", "localhost:9092")
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("auto.offset.reset", "earliest")
        props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Collections.singletonList(topic))
    val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

它给了我所有的记录,但问题是我已经在 kafka 消费者中有数据,这可能导致重复数据意味着具有相同键的数据已经存在于主题中。有什么方法可以让我从特定时间检索数据。如果我可以计算当前时间并仅检索那些在该时间之后出现的记录,则意味着在轮询之前。我有什么办法可以做到这一点?

【问题讨论】:

  • 您是否在寻找latest 选项-props.put("auto.offset.reset", "latest")
  • no latest 对我没有用,因为我已经有主题中的数据
  • 我已经在 kafka 消费者中有数据你能详细说明一下这个问题吗?一旦consumer group 中的consumer 阅读了消息Kafkacommit 消息并且应该阅读only once by consumer。你是怎么得到重复的?
  • 我可能有重复的键。我的制作人每次都会使用相同的密钥发送数据。我正在使用已经有数据的主题中的数据。我只是订阅它并阅读数据
  • @PrathapReddy 有有效的用例用于多次读取/处理

标签: scala apache-kafka kafka-consumer-api


【解决方案1】:

使用任何给定时间戳的唯一方法是

  1. 查找offsetsForTimes
  2. seekcommitSync 那个结果
  3. 开始投票

但是,你需要注意数据流是连续的,以后可能还会有重复的key。


如果您在数据中拥有相同的键,并且您只想查看最新的,那么您最好使用 KTable

【讨论】:

  • 抱歉,不想复制您的答案...只是花了一些时间来实际编写和测试我的代码。
  • @mike 没关系。我在使用所有 Futures 类之前编写了代码,所以它比你的要混乱得多
【解决方案2】:

您可以在 KafkaConsumer API 中使用 offsetsForTimes method

代码

import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._

object OffsetsForTime extends App {

  implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] =
    offsetQuery
      .map { case (tp, time) => tp -> new java.lang.Long(time) }
      .asJava

  val topic = "myInTopic"
  val timestamp: Long = 1595971151000L

  val props = new Properties()
  props.put("group.id", "group-id1337")
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("auto.offset.reset", "earliest")
  val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

  val topicPartition = new TopicPartition(topic, 0)
  consumer.assign(java.util.Collections.singletonList(topicPartition))
  // dummy poll before calling seek
  consumer.poll(Duration.ofMillis(500))

  // get next available offset after given timestamp
  val (_, offsetAndTimestamp) = consumer.offsetsForTimes(Map(topicPartition -> timestamp)).asScala.head
  // seek to offset
  consumer.seek(topicPartition, offsetAndTimestamp.offset())

  // poll data
  val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

  for (data <- record) {
    println(s"Timestamp: ${data.timestamp()}, Key: ${data.key()}, Value: ${data.value()}")
  }

}

测试

./kafka/current/bin/kafconsole-consumer.sh --bootstrap-server localhost:9092 --topic myInTopic --from-beginning --property print.value=true --property print.timestamp=true
CreateTime:1595971142560    1_old
CreateTime:1595971147697    2_old
CreateTime:1595971150136    3_old
CreateTime:1595971192649    1_new
CreateTime:1595971194489    2_new
CreateTime:1595971196416    3_new

将时间戳选择为3_old1_new 之间的时间,以仅使用“新”消息。

输出

Timestamp: 1595971192649, Key: null, Value: 1_new
Timestamp: 1595971194489, Key: null, Value: 2_new
Timestamp: 1595971196416, Key: null, Value: 3_new

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-12-21
    • 2019-08-10
    • 2021-07-14
    • 1970-01-01
    • 1970-01-01
    • 2021-09-22
    • 2019-08-18
    相关资源
    最近更新 更多