【发布时间】:2016-02-28 20:10:35
【问题描述】:
我遇到了这个link,用于在 Java 中实现 KafkaConsumer。下面列出的代码读取流并处理消息。一旦启动,此代码会继续收听传入的消息吗?如果是这样,它如何继续运行并继续消费消息?
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
【问题讨论】:
-
我想知道他们为什么决定称它为 Kafka。
-
卡夫卡起源于捷克语。卡夫卡的意思是“像一只鸟”。也许是一只“鸽子”,将消息从源传递到目的地;)
-
多么有趣!我会说一点(很少)捷克语,但我不知道,而且我的字典里没有。
标签: java spring apache-kafka