【问题标题】:Kafka Consumer - Java Client卡夫卡消费者 - Java 客户端
【发布时间】:2016-02-21 01:06:19
【问题描述】:

我在 kafka 消费者文档中看到了这个注释 -

由于有许多分区,这仍然可以平衡许多分区的负载 消费者实例。但是请注意,不能有更多的消费者 实例而不是分区。

我有一个主题的 50 个分区。如果我将 a_numThreads 值设为 50,是否会从每个分区中获取 1 条消息?以上消息是否意味着我在任何时间点都不能创建超过 50 个线程?

public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    您正在执行a_numThreads = 50 然后Executors.newFixedThreadPool(a_numThreads); 是的事实,这意味着您不能在任何时间点创建超过 50 个线程,至少不能使用该执行程序。

    文档的意思是,一个分区只能分配给 1 个流,如果您不是创建 50 个流而是创建 51 个流,则后者将一无所获,正如 here 所解释的那样

    【讨论】:

      猜你喜欢
      • 2018-05-05
      • 1970-01-01
      • 2016-06-05
      • 2019-07-03
      • 2021-08-22
      • 1970-01-01
      • 1970-01-01
      • 2020-10-28
      • 2015-12-18
      相关资源
      最近更新 更多