【问题标题】:Kafka consumer properties from the beginning in a topicKafka 消费者属性从一个主题开始
【发布时间】:2020-07-18 08:50:33
【问题描述】:

我正在尝试编写一个 Kafka 消费者来从一开始就使用这些消息。我可以使用 --from-beginning 从控制台消费者做同样的事情

但我在 JAVA API 中找不到相应的属性。

 def consumeFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }

还有一个关于 Avro 消息的 value.deserializer 应该是什么的问题?

【问题讨论】:

    标签: scala apache-kafka kafka-consumer-api avro


    【解决方案1】:

    kafka-console-consumer 中使用的--from-beginning 的影响可以通过将auto.offset.reset 设置为earliest 来实现。结合独特/新的group.id 具有相同的效果。

    基本上,您想创建一个新的消费者组(通过group.id),并且由于Kafka Broker 不知道这个消费者组,它会根据配置auto.offset.reset 自动重置这个消费者组的偏移量。当设置为earliest 时,它将从头开始。当设置为latest 时,它会等待新的传入数据。

    关于 Avro 反序列化,here 可能会有所帮助。

    【讨论】:

    • Avro 反序列化函数 from_avro 仅适用于 Spark 2.4 及更高版本,如何在 scala 中进行反序列化而不是使用 spark。
    猜你喜欢
    • 2016-09-04
    • 2017-02-15
    • 1970-01-01
    • 2017-01-26
    • 1970-01-01
    • 2018-07-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-12
    相关资源
    最近更新 更多