【问题标题】:Not all kafka consumers are getting assigned to partitions并非所有 kafka 消费者都被分配到分区
【发布时间】:2020-02-18 18:39:48
【问题描述】:

我有 10 个消费者和 10 个分区。 我取分区数

int partitionCount = getPartitionCount(kafkaUrl);

我创建了相同数量的具有相同 group.id 的消费者。

    public void listen() {
        try {
            String kafkaUrl = getKafkaUrl();
            int partitionCount = getPartitionCount(kafkaUrl);
            Stream.iterate(0, i -> i + 1)
                    .limit(partitionCount)
                    .forEach(index -> executorService.execute(() ->
                            consumerTask.invokeKafkaConsumerTask(prepareConsumerConfig(index, kafkaUrl), INPUT_TOPIC)));
        } catch (Exception exception) {
            logger.error("Cannot receive event from kafka ", exception);
        }


    public void invokeKafkaConsumerTask(Properties properties, String topicName) {
        try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
            consumer.subscribe(Collections.singletonList(topicName));
            logger.info("[KAFKA] consumer created");
            invokeKafkaConsumer(consumer);
        } catch (IllegalArgumentException exception) {
            logger.error("Cannot create kafka consumer ", exception);
        }
    }

    private void invokeKafkaConsumer(KafkaConsumer<String, String> consumer) {
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(4));
                if (consumerRecords.count() > 0) {
                    consumeRecords(consumerRecords);
                    consumer.commitSync();
                }
            }
        } catch (Exception e) {
            logger.error("Error while receiving records ", e);
        }
    }

方法getPartitionCount 返回 10 个分区,使其正常工作

配置看起来像这样

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_CLIENT_ID);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + index);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

将消费者分配到分区后看到的内容

TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CLIENT-ID                                                        
topicName  1          89391           89391           0               consumer0
topicName  3          88777           88777           0               consumer1
topicName  5          89280           89280           0               consumer2
topicName  4          88776           88776           0               consumer2
topicName  0          4670991         4670991         0               consumer0
topicName  9          23307           89343           66036           consumer4
topicName  7          89610           89610           0               consumer3
topicName  8          88167           88167           0               consumer4
topicName  2          89138           89138           0               consumer1
topicName  6          88967           88967           0               consumer3

只有一半的消费者被分配到分区 为什么会这样?
根据文档,每个分区应该有一个使用者。
我做错了吗?
kafka 2.1.1 版

我也发现很少有这样的日志 ->

Setting newly assigned partitions:[empty]

【问题讨论】:

  • 属性auto.leader.rebalance.enable的配置是什么?
  • 我在配置中看不到它,但在日志中我可以看到auto.leader.rebalance.enable = true
  • 你能分享kafka-topics --describe --zookeeper my_zookeeper_ip:2181 --topic yourTopicName的输出吗
  • 和我的描述一样 -> PARTITION 从 0-9 和 CONSUMER-ID 从 0-4 每个消费者有 2 个分区 ctxt.io/2/AABABWsYEg
  • 我的错。本来是kafka-consumer-groups.sh --bootstrap-server &lt;kafka_brokers&gt; --describe –group &lt;consumer_group_id&gt;

标签: apache-kafka kafka-consumer-api


【解决方案1】:

[解决方案] 有趣的案例我更改了 group.id 和 partition.assignment.strategy,添加了 auto.offset.reset=earliest 并且看起来它可以工作...

【讨论】:

    【解决方案2】:

    您订阅的是主题名称或 java 模式的集合吗?
    如果您订阅的是 Pattern ,请将 partition.assignment.strategy 更改为 RoundRobinAssignorStickyAssignor

    【讨论】:

    • 它是字符串 - 主题名称和策略是 org.apache.kafka.clients.consumer.RoundRobinAssignor
    猜你喜欢
    • 2019-03-17
    • 1970-01-01
    • 1970-01-01
    • 2021-01-26
    • 2017-01-04
    • 1970-01-01
    • 2019-08-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多