【问题标题】:Run kafka consumer without specifying partition在不指定分区的情况下运行 kafka 消费者
【发布时间】:2021-11-11 16:45:25
【问题描述】:

我最近在学习Kafka,除非我指定--parititon 0参数,否则我的消费者无法消费任何记录。换句话说,我不能使用以下记录:

kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic 

但工作方式如下:

kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic --partition 0

主要问题是,当我转移到 java 代码时,我的 KafkaConsumer 类无法获取记录,我需要知道如何在 java KafkaConsumer 中指定 partition 数字?!

我当前的java代码是:


public class ConsumerDemo {

    public static void main(String[] args) {

        Logger logger = LoggerFactory.getLogger((ConsumerDemo.class.getName()));

        String bootstrapServer = "127.0.0.10:9092";
        String groupId = "my-kafka-java-app";
        String topic = "first-topic";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        //properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // subscribe consumer to our topic
        consumer.subscribe(Collections.singleton(topic)); //means subscribtion to one topic

        // poll for new data
        while(true){
            //consumer.poll(100); old way
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records){
                logger.info("Key: " + record.key() + ", Value: "+ record.value() );
                logger.info("Partition: " + record.partition() + ", Offset: "+ record.offset());
            }

        }

    }
}

【问题讨论】:

  • 使用新鲜消费组时无法复制。您是否修改了任何 Kafka 服务器属性?您使用的是什么版本的 Kafka 和客户端?
  • 换句话说,请显示kafka-consumer-groups --bootstrap-server 127.0.0.10:9092 --group my-kafka-java-app --describe的输出... 为什么是127.0.0.10?此地址不是正确的环回 IP 或专用 LAN 地址
  • @OneCricketeer,关于IP,这是一个假设的IP,而在我们执行时我输入了服务器的真实姓名..这没什么大不了的
  • @OneCricketeer,关于命令的输出,它显示:Error: Executing consumer group command failed due to org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
  • cloudera上的版本是2.1.0-cdh6.2.1

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

经过大量检查,我的解决方案是使用 consumer.assignconsumer.seek 而不是使用 consumer.subscribe 并且没有指定 groupId。但我觉得应该有更优化的解决方案

java 代码如下:

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // subscribe consumer to our topic
        //consumer.subscribe(Collections.singleton(topic)); //means subscription to one topic
        // using assign and Seek, are mostly used to replay data or fetch a specific msg
        TopicPartition  partitionToReadFrom = new TopicPartition(topic, 0);
        long offsetToReadFrom = 15L;
        // assign
        consumer.assign(Arrays.asList(partitionToReadFrom));

        // seek: for a specific offset to read from
        consumer.seek(partitionToReadFrom, offsetToReadFrom);

【讨论】:

    【解决方案2】:

    你的做法是正确的。订阅主题时无需指定分区。也许你的消费者组已经消费了主题中的所有消息并提交了最新的偏移量。

    确保在您运行应用程序或创建新的消费者组以从头开始消费时产生新消息(如果您将ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为“最早”)

    【讨论】:

    • 问题是我什至在生产者之前运行消费者,当我在 shell 命令中这样做时,它只有在我指定 --partition 时才有效。即使在 java 代码中,即使我并行运行 1 个以上的消费者,它也不会消耗记录。我试图改变 group_id ..但没有工作
    【解决方案3】:

    顾名思义,ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG 属性旨在配置一个Partition Assignment Strategy 而不是按照命令行的指示设置一个固定的分区。

    使用的默认策略是RangeAssignor,可以更改为StickyAssignor,如下所示:

    properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,StickyAssignor.class.getName());
    

    你可以阅读更多关于Kafka Client Side Assignment Proposal的信息。

    【讨论】:

    • 虽然策略行被注释掉了
    • @OneCricketeer,确切地说,我认为这在当前情况下不会成为问题
    猜你喜欢
    • 1970-01-01
    • 2017-01-04
    • 2018-12-03
    • 1970-01-01
    • 1970-01-01
    • 2017-10-17
    • 2018-10-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多