【问题标题】:Kafka Streams 应用程序中的 Kafka Metrics NullPointerException
【发布时间】:2022-01-23 04:04:53
【问题描述】:

我正在使用 Spring Boot 和 Kafka Streams,虽然我没有使用 Spring Cloud Stream。

我确实使用StreamsBuilderFactoryBean 并像这样设置KafkaStreamsMicrometerListener

 @Bean
    public StreamsBuilderFactoryBean streamsApp() {
     
        StreamsConfig streamsConfig = new StreamsConfig(props);
        StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilder.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
                Collections.singletonList(new ImmutableTag("stream_app_id", streamingAppName))));
        return streamsBuilder;
    }

但是当我启动我的应用程序时,我遇到了以下 NPE:

java.util.ConcurrentModificationException: null\n\tat java.util.HashMap$HashIterator.nextNode(未知来源)\n\tat java.util.HashMap$ValueIterator.next(未知来源)\n\tat org.apache.kafka.streams.processor.internals.ClientUtils.producerMetrics(ClientUtils.java:99)\n\tat org.apache.kafka.streams.processor.internals.ActiveTaskCreator.producerMetrics(ActiveTaskCreator.java:282)\n\tat org.apache.kafka.streams.processor.internals.Tasks.producerMetrics(Tasks.java:291)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.producerMetrics(TaskManager.java:1309)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.producerMetrics(StreamThread.java:1212)\n\tat org.apache.kafka.streams.KafkaStreams.lambda$metrics$5(KafkaStreams.java:546)\n\tat org.apache.kafka.streams.KafkaStreams.processStreamThread(KafkaStreams.java:1596)\n\tat org.apache.kafka.streams.KafkaStreams.metrics(KafkaStreams.java:545)\n\tat io.micrometer.core.instrument.binder.kafka.KafkaMetrics.lambda$toMetricValue$4(KafkaMetrics.java:244)\n\tat io.micrometer.core.instrument.cumulative.CumulativeFunctionCounter.count(CumulativeFunctionCounter.java:39)\n\tat io.micrometer.prometheus.PrometheusMeterRegistry.lambda$newFunctionCounter$10(PrometheusMeterRegistry.java:312)\n\tat io.micrometer.prometheus.MicrometerCollector.collect(MicrometerCollector.java:70)\n\tat io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:207)\n\tat io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:240)\n\tat io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:161)\n\tat io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:65)\n\tat org.springframework.boot.actuate.metrics.export.prometheus.TextOutputFormat$2.write(TextOutputFormat.java:57)\n\tat org.springframework.boot.actuate.metrics.export.prometheus.PrometheusScrapeEndpoint.scrape(PrometheusScrapeEndpoint.java:58)\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(未知来源)\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke(未知来源)\n\tat jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(未知来源)\n\tat java.lang.reflect.Method.invoke(未知来源)\n\tat org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)\n\tat org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74)\n\tat org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)\n\tat org.springframework.boot.actuate.endpoint.web.servlet.AbstractWebMvcEndpointHandlerMapping$ServletWebOperationAdapter.handle(AbstractWebMvcEndpointHandlerMapping.java:291)\n\tat org.springframework.boot.actuate.endpoint.web.servlet.AbstractWebMvcEndpointHandlerMapping$OperationHandler.handle(AbstractWebMvcEndpointHandlerMapping.java:376)\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(未知来源)\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke(未知来源)\n\tat jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(未知来源)\n\tat java.lang.reflect.Method.invoke(未知来源)\n\tat org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)\n\tat org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)\n\tat org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)\n\tat org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)\n\tat org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)\n\tat org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)\n\tat org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067)\n\tat org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963)\n\tat org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)\n\tat org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:655)\n\tat org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:764)\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)\n\tat org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)\n\tat org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)\n\tat org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)\n\tat org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96)\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)\n\tat org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)\n\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)\n\tat org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)\n\tat org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)\n\tat org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540)\n\tat org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)\n\tat org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)\n\tatorg.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)\n\tat org.apache.catalina.valves .RemoteIpValve.invoke(RemoteIpValve.java:769)\n\tat org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)\n\tat org.apache.coyote.http11.Http11Processor.service(Http11Processor .java:382)\n\tat org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)\n\tat org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)\n\ tat org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1726)\n\tat org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)\n\ tat org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)\n\tat org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)\n\ tat org.apache.tomcat.util.threads.TaskThread $WrappingRunnable.run(TaskThread.java:61)\n\tat java.lang.Thread.run(Unknown Source)\n"}

更新:

我正在使用以下依赖项:

  • 弹簧靴 2.5.5
  • 微米 1.7.4
  • apache kafka 流 3.0.0

感谢任何帮助!谢谢

【问题讨论】:

    标签: spring-boot apache-kafka prometheus apache-kafka-streams micrometer


    【解决方案1】:

    似乎org.apache.kafka.streams.KafkaStreams.metrics(KafkaStreams.java:545) 调用失败,这似乎是 Kafka 客户端问题机器人 Spring Boot/Prometheus/Micrometer 之一。

    您没有提到您使用的任何版本,但从堆栈跟踪中我认为您的 Micrometer 版本已经很旧了。由此,我假设您的 Spring Boot 和 Kafka 客户端也很旧,所以首先我会尝试升级依赖项,希望问题已在 Kafka 客户端中得到解决。

    【讨论】:

    • 我已经用我正在使用的依赖项的版本更新了我的问题。我不认为它们真的那么旧。
    猜你喜欢
    • 2018-07-17
    • 2016-12-27
    • 1970-01-01
    • 1970-01-01
    • 2019-04-09
    • 1970-01-01
    • 1970-01-01
    • 2021-03-01
    • 1970-01-01
    相关资源
    最近更新 更多