【问题标题】:Kafka Java Consumer already closedKafka Java Consumer 已经关闭
【发布时间】:2026-02-02 13:15:02
【问题描述】:

我刚刚开始使用 Kafka。我正面临与消费者的一个小问题。我用Java编写了一个消费者。

我收到此异常 - IllegalStateException 此消费者已关闭。

以下行出现异常:

ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);

在我的消费者因某些异常而崩溃之后开始发生这种情况,当我再次尝试运行它时,它给了我这个异常。

这是完整的代码:

package StreamApplicationsTest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;

public class StreamAppConsumer {

public static void main(String[] args){
    int i = 0;
    //List<String> topics = new ArrayList<>();
    List<String> topics = Collections.singletonList("test_topic");
    //topics.add("test_topic");
    Properties consumerConfigurations = new Properties();
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");

    Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
    consumer.subscribe(topics);

    while(true){
        ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
        while(iterator.hasNext()){
            i++;
            ConsumerRecord<String,String> consumerRecord = iterator.next();
            String key = consumerRecord.key();
            String value = consumerRecord.value();
            if(key=="exit" || value=="exit")
                break;
            System.out.println("Key="+key+"\tValue="+value);
        }

        System.out.println("Messages processed = "+Integer.toString(i));
        consumer.close();

    }
}
}

我只是被这个问题所困扰,任何形式的帮助都会很有用。

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    发生这种情况是因为您在无限循环结束时关闭了消费者,因此当它第二次轮询消费者时,消费者已经关闭。为了解决眼前的问题,我将整个while(true) 循环包装在一个try-catch 中,并在catch 或finally 块中处理消费者关闭。

    但是,如果 Kafka 使用者未仔细处理不同的关闭信号,您将面临丢失数据的风险。我建议查看 Confluent 的示例,以了解如何优雅地关闭消费者 here。在你的情况下,因为你在主线程中运行它看起来像这样......

    public static void main(String[] args) {
        int i = 0;
        //List<String> topics = new ArrayList<>();
        List<String> topics = Collections.singletonList("test_topic");
        //topics.add("test_topic");
        Properties consumerConfigurations = new Properties();
        consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId");
    
        Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations);
        consumer.subscribe(topics);
    
        Runtime.getRuntime().addShutdownHook(new Thread()
        {
          public void run() {
            consumer.wakeup();
          }
        });
    
        try {
          while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
            Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
            while (iterator.hasNext()) {
              i++;
              ConsumerRecord<String, String> consumerRecord = iterator.next();
              String key = consumerRecord.key();
              String value = consumerRecord.value();
              if (key == "exit" || value == "exit")
                break;
              System.out.println("Key=" + key + "\tValue=" + value);
            }
            System.out.println("Messages processed = " + Integer.toString(i));
          }
        } catch (WakeupExection e) {
          // Do Nothing
        } finally {
          consumer.close();
        }
      }
    }
    

    基本上运行consumer.wakeup() 是消费者中唯一的线程安全方法,因此它是唯一可以在Java 的关闭挂钩中运行的方法。由于调用唤醒时消费者没有睡着,它会触发唤醒执行,从而优雅地关闭消费者。

    【讨论】:

      【解决方案2】:

      这似乎有效

      public static void main(String[] args) {
      
              List<String> topics = new ArrayList<>();
              topics.add("test.topic");
      
              final Properties props = new Properties();
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP_TO_KAFKA_SERVER");
              props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      
              KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
              consumer.subscribe(topics);
      
              System.out.println("Polling");
              ConsumerRecords<String, String> consumerRecords = consumer.poll(5000);
      
              try {
                  for (ConsumerRecord<String, String> record : consumerRecords) {
                      System.out.println(record.offset() + ": " + record.value());
                  }
              } finally {
                  consumer.close();
              }
          }
      

      确保您的服务器或本地 kafka 可访问

      输出

      --- exec-maven-plugin:1.2.1:exec (default-cli) @ MVN ---
      SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
      SLF4J: Defaulting to no-operation (NOP) logger implementation
      Polling
      SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
      6: test
      7: tes
      

      【讨论】: