【发布时间】:2019-05-14 09:46:08
【问题描述】:
我已使用此命令手动创建主题test:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
并使用此命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
我插入了这些记录:
This is a message
This is another message
This is a message2
首先,我通过这样的命令行使用消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
并且所有记录都成功显示。然后,我尝试使用以下代码在 Java 中实现消费者:
public class KafkaSubscriber {
public void consume() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
// also with this command
// consumer.subscribe(Arrays.asList("test"));
System.out.println("Starting to read data...");
try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("Number of records found: " + records.count());
for (ConsumerRecord rec : records) {
System.out.println(rec.value());
}
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
但是输出是:
Starting to read data...
0
0
0
0
0
....
这意味着它在主题test 中没有找到任何记录。我还尝试在 Java 使用者启动之后发布一些记录,但同样如此。有什么想法可能出了什么问题?
编辑:添加以下行后:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
消费者现在只在我向主题写入新记录时读取。它不会从一开始就读取所有记录。
【问题讨论】:
-
能否请您在属性中添加
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG为earliest然后发布结果? -
@Bitswazsky 我用结果编辑了我的问题。它现在在发布者写入新内容时读取,但当消费者开始时,它不会读取主题中已有的内容。
标签: java apache-kafka kafka-consumer-api