【问题标题】:How to detect if kafka broker is not available from consumer in java?java - 如何检测java中的消费者是否无法使用kafka代理?
【发布时间】:2018-03-29 18:45:24
【问题描述】:

我有一个简单的 Java Kafka 消费者。如果 Kafka 代理不可用,我正在尝试捕获异常。我需要它来中断线程。

我有这样的代码:

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties());
kafkaConsumer.subscribe(Arrays.asList(topic));
try {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
    // records handling
} catch(Exception e) {
    System.out.println(e.getMessage());
}finally {
    kafkaConsumer.close();
}

如果 Kafka 服务器关闭,我不会捕获任何异常,但日志中会显示以下消息:

18/03/28 13:33:39 WARN clients.NetworkClient: [Consumer clientId=consumer-3, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.
18/03/28 13:33:40 WARN clients.NetworkClient: [Consumer clientId=consumer-1, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.

有没有办法在我的线程中处理它?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    这样的东西可以工作:

    Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    System.out.println("Starting exit...");
                    consumer.wakeup(); 1
                    try {
                        mainThread.join();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    

    你做consumer.wakeup()来中断当前消费者的操作。

    mainThread.join() 被放在那里以确保主线程真正完成并且在唤醒后的处理过程中不会关闭。请记住,shutdownHook 也负责处理中断,而不仅仅是普通的程序关闭。

    【讨论】:

      【解决方案2】:

      我解决了这个问题有点笨拙,但它对我有用。

      public boolean testSocket(String serversList) {        
          String[] sockets = serversList.split(",");
          int unactive = 0;
          for (int i = 0; i < sockets.length; i++) {
              try {
                  String[] socket = sockets[i].split(":");
                  (new Socket(socket[0], Integer.valueOf(socket[1]))).close();
              } catch (IOException e) {
                  unactive++;
              }
          }
          if (unactive < sockets.length) return true;
          return false;
      }
      

      【讨论】:

        猜你喜欢
        • 2017-01-31
        • 1970-01-01
        • 1970-01-01
        • 2015-12-12
        • 1970-01-01
        • 1970-01-01
        • 2019-09-27
        • 2016-05-29
        • 1970-01-01
        相关资源
        最近更新 更多