【问题标题】:Spring Boot: Kafka health indicatorSpring Boot:Kafka 健康指标
【发布时间】:2021-02-28 07:26:51
【问题描述】:

我有类似下面的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅仅是检查套接字连接)。我知道 Kafka 有开箱即用的 KafkaHealthIndicator 之类的东西,有人有使用它的经验或示例吗?

   public class KafkaHealthIndicator implements HealthIndicator {
   private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);

   private KafkaTemplate<String, String> kafka;

   public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
   this.kafka = kafka;
   }

  @Override
  public Health health() {
  try {
     kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
  } catch (InterruptedException | ExecutionException | TimeoutException e) {
      return Health.down(e).build();
  }
  return Health.up().build();
 }
}

【问题讨论】:

    标签: java spring spring-boot apache-kafka spring-kafka


    【解决方案1】:

    kafkaAdminClient.describeCluster(..) 是测试 Kafka 可用性的地方。

    @Configuration
    public class KafkaConfig {
    
        @Autowired
        private KafkaAdmin kafkaAdmin;
    
        @Bean
        public AdminClient kafkaAdminClient() {
            return AdminClient.create(kafkaAdmin.getConfigurationProperties());
        }
    
        @Bean
        public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
            final DescribeClusterOptions options = new DescribeClusterOptions()
                .timeoutMs(1000);
    
            return new AbstractHealthIndicator() {
                @Override
                protected void doHealthCheck(Health.Builder builder) throws Exception {
                    // When Kafka is not connected, describeCluster() method throws
                    // an exception which in turn sets this indicator as being DOWN.
                    kafkaAdminClient.describeCluster(options);
    
                    builder.up().build();
                }
            };
        }
    
    }
    

    更详细的探针添加:

    DescribeClusterResult clusterDesc = kafkaAdminClient.describeCluster(options);
    builder.up()
        .withDetail("clusterId", clusterDesc.clusterId().get())
        .withDetail("nodeCount", clusterDesc.nodes().get().size())
        .build();
    

    【讨论】:

    • 请我无法访问此行中的选项变量 kafkaAdminClient.describeCluster(options);
    • @James 几乎没有错别字。现在就来看看吧。
    • 谢谢。现在没问题了。但是我收到错误 TimeoutException Timed out waiting to send the call。
    • @James 猜你没有连接到 Kafka
    • 福斯卡。我认为这是一个身份验证问题。你知道我应该向 DescribeClusterOptions 提供哪些身份验证属性
    【解决方案2】:

    使用 AdminClient API 通过描述集群和/或您将与之交互的主题来检查集群的运行状况,并验证这些主题是否具有所需数量的同步副本,例如

    Kafka 有开箱即用的 KafkaHealthIndicator 之类的东西

    它没有。 Spring 的 Kafka 集成可能

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-25
    相关资源
    最近更新 更多