【发布时间】:2021-01-31 02:58:44
【问题描述】:
我正在使用 Kafka 0.9.1 新的消费者 API。消费者被手动分配到一个分区。对于这个消费者,我希望看到它的进步(意味着滞后)。由于我添加了 group id consumer-tutorial 作为属性,我假设我可以使用命令
bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092
(这里解释为http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client)
很遗憾,我的消费者组详细信息未使用上述命令显示。因此我无法监控我的消费者的进度(这是滞后的)。如何监控上述场景(手动分配分区)中的延迟?
代码是:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "my-topic";
TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToBeginning(topicPartition);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
consumer.commitSynch();
}
} finally {
consumer.close();
}
【问题讨论】:
标签: apache-kafka kafka-consumer-api