【发布时间】:2013-11-11 13:32:30
【问题描述】:
我写了一个high level Kafka consumer 作为 java 应用程序的一部分。
所以核心代码是这样的:
public void start() {
ConsumerConnector consumerConnector = conf.getConsumerConnector();
String topic = conf.getTopic();
int numOfThereads = conf.getNumOfThreads();
Map<String, Integer> topicCountMap = ImmutableMap.of(topic, numOfThereads);
Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<Message>> streams = topicMessageStreams.get(topic);
// create 4 threads to consume from each of the partitions
executor = Executors.newFixedThreadPool(numOfThereads);
// consume the messages in the threads
for (final KafkaStream<Message> stream : streams) {
executor.submit(new ConsumerThread(stream));
}
}
为了测试我的消费者,我还创建了一个生产者,写信给 kafka,然后启动了我的消费者,它可以工作。 由于线程是在循环中执行的,我不确定我是否做对了。 我希望我的消费者永远运行并继续使用来自 kafka 的消息。
什么是让它永远运行的正确方法?
【问题讨论】:
-
您如何迭代来自 Kakfa 的消息?根据他们的文档,他们说
The interesting part here is the while (it.hasNext()) section. Basically this code reads from Kafka until you stop it. -
就我的测试而言,您似乎是对的,除非我停止,否则消费者会继续在 bg 中工作。随时将其发布为答案
标签: java multithreading apache-kafka