【发布时间】:2016-10-31 22:22:14
【问题描述】:
我在Apache Kafka 上建立了一个排队系统。该应用程序将向特定的Kafka topic 生成消息,并且在消费者端,我必须使用该主题生成的所有记录。
我使用新的 Java Consumer Api 编写了消费者。
代码看起来像
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerIp+":9092");
props.put("group.id",groupId);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("consumertest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println("Data recieved : "+record.value());
}
}
这里我需要永远运行消费者,以便生产者推送到 kafka 主题的任何记录都应该立即消费和处理。
所以我的困惑是,使用无限while循环(如示例代码中)来消耗数据是否正确?
【问题讨论】:
标签: java apache-kafka kafka-consumer-api