【发布时间】:2020-07-18 08:50:33
【问题描述】:
我正在尝试编写一个 Kafka 消费者来从一开始就使用这些消息。我可以使用 --from-beginning 从控制台消费者做同样的事情
但我在 JAVA API 中找不到相应的属性。
def consumeFromKafka(topic: String) = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator)
println(data.value())
}
}
还有一个关于 Avro 消息的 value.deserializer 应该是什么的问题?
【问题讨论】:
标签: scala apache-kafka kafka-consumer-api avro