【问题标题】:Kafka consumer group consumer failover detectionKafka消费者组消费者故障转移检测
【发布时间】:2021-11-18 22:57:46
【问题描述】:

我有 1 个 kafka 主题,只有 1 个分区。

在任何时候,都可以有多个 kafka 客户端。所有客户端都使用相同的消费者组订阅。因此,在任何给定时间点,只有 1 个客户端会接收消息。假设从 t0 到 t10,consumer1 正在获取消息,但一段时间后它停止获取消息,并且 consumer2 被选为新的领导者(可能是因为 consumer1 中的 GC 暂停)。在我的 consumer1 中,我想检测何时发生此故障转移,以便它可以刷新其本地状态。

可以用kafka客户端代码做吗?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    可以使用ConsumerRebalanceListener接口中的onPartitionsRevoked回调方法。

    来自description

    用户可以实现的回调方法,以提供对自定义存储的偏移提交的处理。 当消费者必须放弃一些分区时,将在重新平衡操作期间调用此方法。 也可以在消费者关闭或取消订阅时调用。 建议在此回调中将偏移量提交到 Kafka 或自定义偏移量存储,以防止重复数据。

    示例实现:

    private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
            // Add your changes here to flush
        }
    
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
        }
    }
    

    示例:

    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    
    public class TestConsumer {
        
        KafkaConsumer<String, String> kafkaConsumer;
        
        public static void main(String[] args) {
            TestConsumer consumer = new TestConsumer();
            consumer.pollMessages();
        }
    
        public TestConsumer() {
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-example-consumer");
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            
            kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.subscribe(Arrays.asList("input-topic"), new ConsumerPartitionAssignmentListener());
        }
        
        public void pollMessages() {
            while(true) {
                System.out.println("Polling");
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(5000));
                System.out.println(records.count());
            }
        }
        
        private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
                // Add your changes here to flush
            }
    
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
            }
        }
    }
    

    输出:

    Poll
    Partitions assignment listener: [input-topic-0]
    0
    Poll
    0
    Poll
    Partitions revoke listener: [input-topic-0]
    Partitions assignment listener: [input-topic-0]
    0
    Poll
    0
    

    【讨论】:

    • 我认为 revoke 和 assignment 是一种罕见的操作,它是否像您在输出中显示的那样经常发生?
    • 你是对的,这是一种罕见的操作,如果组中只有一个消费者,这种情况不会经常发生。我启动了两个消费者(主题只有一个分区)来展示回调方法的工作情况。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-11-10
    • 1970-01-01
    • 2020-09-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-05-19
    相关资源
    最近更新 更多