【问题标题】:KafkaStreams is not running. State is ERRORKafkaStreams 没有运行。状态为错误
【发布时间】:2021-01-29 18:31:35
【问题描述】:

我有一个 Kafka 消费者类,它监听事件并在事件(订单、客户)之间执行连接并将它们存储在物化视图中。当收到 REST 调用时,我创建了另一个类来访问状态存储。但我越来越 java.lang.IllegalStateException:KafkaStreams 未运行。状态为 ERROR。 我尝试分配 application.server 属性,但没有成功。 EventsListener 类加入事件,StateStoreService 类查询事件。代码上传到Github供参考。

基于https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1500079616414627 的讨论以及来自 Matt 的以下注释,我删除了 application.server 属性。

The error means, that a consumer subscribed to topic "source-topic" but got partition "test-name-0" (ie, partition 0 from topic "test-name") assigned. Of course, the consumer did not expect to get a partition of topic "test-name" as it never subscribed to it and this is fails.
In Streams, this can happen if you have two application instances of the same application, and both subscribe to different topics (what is not allowed).
Ie, all instanced of an application must have the same processing logic and must subscribe to the same topics. This implies that if you want to consume 2 topics, all instances need to read both topics.

但我仍然无法从状态存储中读取。代码卡在 thread.sleep() 方法。

例外:

Exception in thread "cqrs-streams-5d77fdb3-f75e-443d-b7b7-fabedcbc483f-StreamThread-1" java.lang.IllegalArgumentException: Assigned partition order-0 for non-subscribed topic regex pattern; subscription pattern is 
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
    at org.apache.kafka.streams.KafkaStreams.validateIsRunning(KafkaStreams.java:312)
    at org.apache.kafka.streams.KafkaStreams.allMetadata(KafkaStreams.java:934)
    at com.kafkastream.service.StateStoreService.waitUntilStoreIsQueryable(StateStoreService.java:105)
    at com.kafkastream.service.StateStoreService.getCustomerOrders(StateStoreService.java:59)
    at com.kafkastream.web.EventsController.getCustomerOrders(EventsController.java:101)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:635)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

  • 您使用的是 .11 或更高版本吗?
  • 是的。我正在使用融合 v4.1
  • 错误java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR. 表示您的KafkaStreams 实例已崩溃,因此您无法查询商店。您需要弄清楚为什么它首先崩溃并解决根本原因。检查应该包含导致 KafkaStreams 死亡的实际错误的日志文件。
  • 你说得对,马特。我把这整件事弄错了。我发现了问题并发布了解决此问题的步骤。

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

“KafkaStreams 未运行。状态为错误” 我收到此错误是因为我没有定义流媒体应用程序的接收器主题。当我定义接收器主题时。我能够解决这个错误。

【讨论】:

    【解决方案2】:

    问题是由错误的方法引起的。 Rest Proxy 应该由消费者类启动,第二类应该从码头服务器启动的 REST API 获取数据

    1. 从 EventsListener 类启动 REST 代理(只需执行 EventsListener 类)
    2. 应该可以从http://localhost:8095/customer-orders/all 访问所有客户订单
    3. 启动 SpringBoot 类并尝试访问http://localhost:8090/customerOrders/CU9978。这应该连接到由 EventsListener 类启动的 REST 代理。

    【讨论】:

      猜你喜欢
      • 2023-01-31
      • 2016-06-22
      • 2020-03-19
      • 2019-04-25
      • 2020-06-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-21
      相关资源
      最近更新 更多