【发布时间】:2021-01-29 18:31:35
【问题描述】:
我有一个 Kafka 消费者类,它监听事件并在事件(订单、客户)之间执行连接并将它们存储在物化视图中。当收到 REST 调用时,我创建了另一个类来访问状态存储。但我越来越 java.lang.IllegalStateException:KafkaStreams 未运行。状态为 ERROR。 我尝试分配 application.server 属性,但没有成功。 EventsListener 类加入事件,StateStoreService 类查询事件。代码上传到Github供参考。
基于https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1500079616414627 的讨论以及来自 Matt 的以下注释,我删除了 application.server 属性。
The error means, that a consumer subscribed to topic "source-topic" but got partition "test-name-0" (ie, partition 0 from topic "test-name") assigned. Of course, the consumer did not expect to get a partition of topic "test-name" as it never subscribed to it and this is fails.
In Streams, this can happen if you have two application instances of the same application, and both subscribe to different topics (what is not allowed).
Ie, all instanced of an application must have the same processing logic and must subscribe to the same topics. This implies that if you want to consume 2 topics, all instances need to read both topics.
但我仍然无法从状态存储中读取。代码卡在 thread.sleep() 方法。
例外:
Exception in thread "cqrs-streams-5d77fdb3-f75e-443d-b7b7-fabedcbc483f-StreamThread-1" java.lang.IllegalArgumentException: Assigned partition order-0 for non-subscribed topic regex pattern; subscription pattern is
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunning(KafkaStreams.java:312)
at org.apache.kafka.streams.KafkaStreams.allMetadata(KafkaStreams.java:934)
at com.kafkastream.service.StateStoreService.waitUntilStoreIsQueryable(StateStoreService.java:105)
at com.kafkastream.service.StateStoreService.getCustomerOrders(StateStoreService.java:59)
at com.kafkastream.web.EventsController.getCustomerOrders(EventsController.java:101)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
【问题讨论】:
-
您使用的是 .11 或更高版本吗?
-
是的。我正在使用融合 v4.1
-
错误
java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.表示您的KafkaStreams实例已崩溃,因此您无法查询商店。您需要弄清楚为什么它首先崩溃并解决根本原因。检查应该包含导致 KafkaStreams 死亡的实际错误的日志文件。 -
你说得对,马特。我把这整件事弄错了。我发现了问题并发布了解决此问题的步骤。
标签: java apache-kafka apache-kafka-streams