【发布时间】:2016-02-21 01:06:19
【问题描述】:
我在 kafka 消费者文档中看到了这个注释 -
由于有许多分区,这仍然可以平衡许多分区的负载 消费者实例。但是请注意,不能有更多的消费者 实例而不是分区。
我有一个主题的 50 个分区。如果我将 a_numThreads 值设为 50,是否会从每个分区中获取 1 条消息?以上消息是否意味着我在任何时间点都不能创建超过 50 个线程?
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
【问题讨论】:
标签: java apache-kafka kafka-consumer-api