【问题标题】:How to check whether Kafka Server is running?如何检查 Kafka Server 是否正在运行?
【发布时间】:2016-10-21 14:45:15
【问题描述】:

我想在开始生产和消费作业之前确保 kafka 服务器是否正在运行。它在windows环境中,这是我在eclipse中的kafka服务器代码......

Properties properties = new Properties();
properties.setProperty("broker.id", "1");
properties.setProperty("port", "9092");
properties.setProperty("log.dirs", "D://workspace//");
properties.setProperty("zookeeper.connect", "localhost:2181"); 

Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(properties);       
KafkaServer kafka = new KafkaServer(config, new CurrentTime(), option);
kafka.startup();

在这种情况下if (kafka != null) 是不够的,因为它总是正确的。那么有什么方法可以知道我的 kafka 服务器正在运行并准备好用于生产者。我有必要检查一下,因为它会导致一些起始数据包丢失。

【问题讨论】:

    标签: java apache-kafka kafka-producer-api


    【解决方案1】:

    必须为所有 Kafka 代理分配一个broker.id。启动时,代理将在 Zookeeper 中创建一个临时节点,路径为/broker/ids/$id。由于节点是短暂的,它将在代理断开连接后立即被删除,例如通过关闭。

    您可以像这样查看临时代理节点的列表:

    echo dump | nc localhost 2181 | grep brokers

    ZooKeeper 客户端接口公开了许多命令; dump 列出集群的所有会话和临时节点。

    注意,以上假设:

    • 您在 localhost 的默认端口 (2181) 上运行 ZooKeeper,而 localhost 是集群的领导者
    • 您的 zookeeper.connect Kafka 配置没有为您的 Kafka 集群指定 chroot 环境,即它只是 host:port 而不是 host:port/path

    【讨论】:

    • 所以,这实际上检查zookeeper 是否有至少一个 kafka 连接。它不会测试 your kafka 是否正在运行。在OP的情况下它是正确的,但这是一个间接测试。可能必须研究在端口 9092 上可以做什么以进行直接测试。
    【解决方案2】:

    我使用了AdminClient api。

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("connections.max.idle.ms", 10000);
    properties.put("request.timeout.ms", 5000);
    try (AdminClient client = KafkaAdminClient.create(properties))
    {
        ListTopicsResult topics = client.listTopics();
        Set<String> names = topics.names().get();
        if (names.isEmpty())
        {
            // case: if no topic found.
        }
        return true;
    }
    catch (InterruptedException | ExecutionException e)
    {
        // Kafka is not available
    }
    

    【讨论】:

    • 这似乎不正确,因为它在只有一个代理启动时返回 true。
    • @Leon 你能详细说明你的评论吗?
    • 如果您有 >=3 个代理且副本为 3,则如果 1 个代理启动而另一个 >=2 关闭,则代码返回 true。实际上它只需要 1 个 zk 节点,您可以在没有任何代理运行的情况下获取主题名称列表。
    • @Leon 我觉得这个答案很有价值。当然,您需要了解您实际“监控”的是什么。像这里一样探测 Zookeeper 和探测引导服务器将回答两组不同的问题。我认为从客户端的角度来看,连接到引导服务器是正确的做法。即使您应该对 Kafka 集群(ZK 和节点)进行专门的监控,但验证特定客户端是否可以实际连接到集群是有意义的。
    • 这应该被标记为接受的答案!
    【解决方案3】:

    您可以在您的机器上安装 Kafkacat 工具

    例如在 Ubuntu 上您可以使用安装它

    apt-get install kafkacat
    

    kafkacat 安装完成后,可以使用以下命令进行连接

    kafkacat -b <your-ip-address>:<kafka-port> -t test-topic
    
    • 用你的机器ip替换
    • 可以替换为运行 kafka 的端口。通常是 9092

    一旦你运行了上面的命令,如果 kafkacat 能够建立连接,那么这意味着 kafka 已经启动并运行了

    【讨论】:

    【解决方案4】:

    对于 Linux,“ps aux | grep kafka”查看结果中是否显示 kafka 属性。例如。 /path/to/kafka/server.properties

    【讨论】:

    • 他们在 Windows 中没有指定它吗?
    【解决方案5】:

    Paul 的回答非常好,从经纪人的角度来看,这实际上是 Kafka 和 Zk 如何协同工作的。

    我想说检查 Kafka 服务器是否正在运行的另一个简单方法是创建一个简单的 KafkaConsumer 指向集群并尝试一些操作,例如 listTopics()。如果 kafka 服务器没有运行,你会得到一个TimeoutException,然后你可以使用try-catch 语句。

      def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
        val props = new Properties()
        props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
        props.put("group.id", kafkaParams.get("group.id").get.toString)
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        val simpleConsumer = new KafkaConsumer[String, String](props)
        simpleConsumer.listTopics()
      }
    

    【讨论】:

    【解决方案6】:

    最好的选择是在开始生成或使用消息之前使用AdminClient,如下所示

    private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;           
     try (AdminClient client = AdminClient.create(properties)) {
                client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get();
            } catch (ExecutionException ex) {
                LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS);
                return;
            }
    

    【讨论】:

    • 请注意,AdminClient 仅从 0.11 版本开始可用
    【解决方案7】:

    首先你需要创建 AdminClient bean:

     @Bean
     public AdminClient adminClient(){
       Map<String, Object> configs = new HashMap<>();
       configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
       StringUtils.arrayToCommaDelimitedString(new Object[]{"your bootstrap server address}));
       return AdminClient.create(configs);
     }
    

    然后,你可以使用这个脚本:

    while (true) {
       Map<String, ConsumerGroupDescription> groupDescriptionMap =
             adminClient.describeConsumerGroups(Collections.singletonList(groupId))
             .all()
             .get(10, TimeUnit.SECONDS);
    
       ConsumerGroupDescription consumerGroupDescription = groupDescriptionMap.get(groupId);
    
       log.debug("Kafka consumer group ({}) state: {}",
                    groupId,
                    consumerGroupDescription.state());
    
       if (consumerGroupDescription.state().equals(ConsumerGroupState.STABLE)) {
            boolean isReady = true;
            for (MemberDescription member : consumerGroupDescription.members()) {
                if (member.assignment() == null || member.assignment().topicPartitions().isEmpty()) {
                isReady = false;
                }
            }
    
            if (isReady) {
                break;
               }
            }
    
            log.debug("Kafka consumer group ({}) is not ready. Waiting...", groupId);
            TimeUnit.SECONDS.sleep(1);
    }
    

    此脚本将每秒检查消费者组的状态,直到状态为STABLE。因为所有消费者都分配到主题分区,所以您可以断定服务器正在运行并准备就绪。

    【讨论】:

      【解决方案8】:

      如果服务器正在运行,您可以使用以下代码检查可用的代理。

      import org.I0Itec.zkclient.ZkClient;
           public static boolean isBrokerRunning(){
              boolean flag = false;
              ZkClient zkClient = new ZkClient(endpoint.getZookeeperConnect(), 10000);//, kafka.utils.ZKStringSerializer$.MODULE$);
              if(zkClient!=null){
                  int brokersCount = zkClient.countChildren(ZkUtils.BrokerIdsPath());
                  if(brokersCount > 0){
                      logger.info("Following Broker(s) {} is/are available on Zookeeper.",zkClient.getChildren(ZkUtils.BrokerIdsPath()));
                      flag = true;    
                  }
                  else{
                      logger.error("ERROR:No Broker is available on Zookeeper.");
                  }
                  zkClient.close();
      
              }
              return flag;
          }
      

      【讨论】:

      • 这里,如果我已经有多个代理在运行,那么会返回错误的结果。您的解决方案可能适用于单一代理环境。
      • 是的,对于单一经纪人来说,这是完美的。顺便说一句,您没有提到多个。让我试一试。您可以检查任何一个经纪人是否已启动,不要检查其他经纪人,因此只要任何经纪人已启动,您就可以跳过其他经纪人。另一方面,您仍然需要检查最后运行的代理,因为在非常不幸的情况下,前 N 个代理可能已关闭。
      【解决方案9】:

      我在confluent Kafka中发现了一个事件OnError

      consumer.OnError += Consumer_OnError;
      
       private void Consumer_OnError(object sender, Error e)
          {
              Debug.Log("connection error: "+ e.Reason);
              ConsumerConnectionError(e);
          }
      

      及其代码文档:

          //
          // Summary:
          //     Raised on critical errors, e.g. connection failures or all brokers down. Note
          //     that the client will try to automatically recover from errors - these errors
          //     should be seen as informational rather than catastrophic
          //
          // Remarks:
          //     Executes on the same thread as every other Consumer event handler (except OnLog
          //     which may be called from an arbitrary thread).
          public event EventHandler<Error> OnError;
      

      【讨论】:

        猜你喜欢
        • 2012-07-22
        • 1970-01-01
        • 2020-02-17
        • 1970-01-01
        • 1970-01-01
        • 2022-12-03
        • 2016-04-25
        • 2010-09-06
        • 2017-10-14
        相关资源
        最近更新 更多