【问题标题】:Spring Boot Kafka - Kafka metrics not available in /actuator/prometheusSpring Boot Kafka - /actuator/prometheus 中不提供 Kafka 指标
【发布时间】:2021-06-16 18:41:54
【问题描述】:

我想监控 Kafka 指标,但不幸的是 /actuator/prometheus 端点下没有任何与 Kafka 相关的内容。我的设置中是否缺少任何内容?

应用程序依赖项:Kotlin 1.4.31、Spring Boot 2.3.9、Spring Kafka 2.6.7、Reactor Kafka 1.2.5、Kafka Clients 2.5.1

应用配置

    management:   
      server:
        port: 8081   
      endpoints:
        web:
          exposure:
            include: health,info,metrics,prometheus
    
    spring:
      jmx:
        enabled: true
      kafka:
        bootstrap-servers: ...
        consumer:
          group-id: my-service
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
          ack-mode: manual
        ssl:
          key-store-location: ...
          key-store-password: ...
        security:
          protocol: SSL

我的接收器看起来像

    @Bean
    fun someEventReceiver(): SomeEventReceiver =
        KafkaReceiver.create(
            ReceiverOptions.create<String, SomeEvent>(kafkaProperties.buildConsumerProperties())
                .withValueDeserializer(SomeEvenDeserializer())
                .subscription(listOf(serviceProperties.kafka.topics.someevent))
        )

和听众

    @EventListener(ApplicationStartedEvent::class)
    fun onSomeEvent() {
        someEventReceiver
            .receive()
            .groupBy { it.receiverOffset().topicPartition() }
            .publishOn(Schedulers.boundedElastic())
            .flatMap { someEvent ->
                someEvent
                    .publishOn(Schedulers.boundedElastic())
                    .delayUntil(::handleEvent)
                    .doOnNext { it.receiverOffset().acknowledge() }
                    .retryWhen(Retry.backoff(10, Duration.ofMillis(100)))
            }
            .retryWhen(Retry.indefinitely())
            .subscribe()
    }

【问题讨论】:

    标签: spring-boot prometheus spring-kafka spring-actuator reactor-kafka


    【解决方案1】:

    spring-kafka 不同,reactor-kafka 目前没有任何 Micrometer 集成。

    如果您在类路径上也有 spring-kafka,您可以利用其MicrometerConsumerListenerKafkaClientMetrics 绑定到仪表注册表(或者您可以自己进行注册绑定)。

    这是一个使用 Spring 监听器的示例:

    @SpringBootApplication
    public class So66706766Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66706766Application.class, args);
        }
    
        @Bean
        ApplicationRunner runner(MicrometerConsumerListener<String, String> consumerListener) {
            return args -> {
                ReceiverOptions<String, String> ro = ReceiverOptions.<String, String>create(
                            Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                                    ConsumerConfig.GROUP_ID_CONFIG, "so66706766"))
                        .withKeyDeserializer(new StringDeserializer())
                        .withValueDeserializer(new StringDeserializer())
                        .subscription(Collections.singletonList("so66706766"));
                KafkaReceiver<String, String> receiver = KafkaReceiver.create(ro);
                receiver.receive()
                        .doOnNext(rec -> {
                            System.out.println(rec.value());
                            rec.receiverOffset().acknowledge();
                        })
                        .subscribe();
                receiver.doOnConsumer(consumer -> {
                    consumerListener.consumerAdded("myConsumer", consumer);
                    return Mono.empty();
                }).subscribe();
            };
        }
    
        @Bean
        MicrometerConsumerListener<String, String> consumerListener(MeterRegistry registry) {
            return new MicrometerConsumerListener<>(registry);
        }
    
        @Bean
        NewTopic topic() {
            return TopicBuilder.name("so66706766").partitions(1).replicas(1).build();
        }
    
    }
    

    # HELP kafka_consumer_successful_authentication_total The total number of connections with successful authentication
    # TYPE kafka_consumer_successful_authentication_total counter
    kafka_consumer_successful_authentication_total{client_id="consumer-so66706766-1",kafka_version="2.6.0",spring_id="myConsumer",} 0.0
    # HELP jvm_gc_live_data_size_bytes Size of long-lived heap memory pool after reclamation
    # TYPE jvm_gc_live_data_size_bytes gauge
    jvm_gc_live_data_size_bytes 0.0
    # HELP kafka_consumer_connection_creation_rate The number of new connections established per second
    # TYPE kafka_consumer_connection_creation_rate gauge
    kafka_consumer_connection_creation_rate{client_id="consumer-so66706766-1",kafka_version="2.6.0",spring_id="myConsumer",} 0.07456936193482637
    ...
    

    我添加了一个问题:https://github.com/reactor/reactor-kafka/issues/206

    【讨论】:

      【解决方案2】:

      按照@gary-russell 的建议(再次感谢您的帮助!),我在构建侦听器时采用了稍微不同的方法,以减少代码量,因为在我的项目中有很多消费者。

          class KafkaReceiverWithMetrics<K, V>(
              private val receiver: KafkaReceiver<K, V>,
              private val consumerId: String,
              private val metricsListener: MicrometerConsumerListener<K, V>, ) : KafkaReceiver<K, V> by receiver {
              override fun receive(): Flux<ReceiverRecord<K, V>> =
                  receiver.receive()
                      .doOnSubscribe {
                          receiver
                              .doOnConsumer { consumer -> metricsListener.consumerAdded(consumerId, consumer) }
                              .subscribe()
                      } }
      

      然后我只需要为每个侦听器提供一个 bean:

          @Bean
          fun someEventReceiver(): SomeEventReceiver =
              KafkaReceiverWithMetrics(
                  KafkaReceiver.create(
                      ReceiverOptions.create<String, SomeEvent>(kafkaProperties.buildConsumerProperties())
                          .withValueDeserializer(SomeEventDeserializer())
                          .subscription(listOf(topics.someEvent))
                  ),
                  topics.someEvent,
                  MicrometerConsumerListener(meterRegistry)
              )
      

      【讨论】:

      • doOnSubscribe 真的在这里工作吗? ...对我来说,它说明消费者为空
      • 确实如此。我正在生产中。
      猜你喜欢
      • 2021-04-23
      • 1970-01-01
      • 1970-01-01
      • 2018-04-02
      • 2021-11-30
      • 1970-01-01
      • 2018-12-24
      • 2021-05-30
      • 2020-07-11
      相关资源
      最近更新 更多