【问题标题】:Suppress stacktrace logging in spring kafka container errors抑制 spring kafka 容器错误中的堆栈跟踪日志记录
【发布时间】:2021-12-30 08:48:43
【问题描述】:

我正在使用一个简单的 Spring Boot 应用程序从 kafka 主题中消费,并使用 SeekToCurrentErrorHandler 配置我的容器工厂

@Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                                                                                                 ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        ...
        final SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> log.error("Error reading from topic: {}, Offset: {}, Partition: {}, Key: {}, Exception: {}",
                consumerRecord.topic(),
                consumerRecord.offset(),
                consumerRecord.partition(),
                consumerRecord.key(),
                ExceptionUtils.getRootCauseMessage(e)), new FixedBackOff(1000L, 3L));
        seekToCurrentErrorHandler.removeNotRetryableException(DeserializationException.class); /*This line is intentional as its easier for me to simulate the issue this way.*/
        factory.setErrorHandler(seekToCurrentErrorHandler);
        ...
}

SeekToCurrentErrorHandler 正在打印在 poll() 返回侦听器之前发生的任何异常(在本例中为 DeserializationException)的整个堆栈跟踪。

这会导致大量日志条目。有没有办法抑制这种行为。我从documentation 尝试了这个技巧,但它只是改变了日志级别,并没有抑制堆栈跟踪。

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

我已将所有依赖项升级到最新版本。

  • 弹簧靴 - 2.5.7
  • 春天 - 5.3.13
  • 春季卡夫卡 - 2.7.9
  • Kafka 客户端 - 2.8.1

堆栈跟踪:

2021-11-19 12:37:53.047  INFO 560 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-MyListenerGroup-Testing-1, groupId=MyListenerGroup-Testing] Resetting offset for partition some-topic-0 to position FetchPosition{offset=7763, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[someserver.testing.net:9092 (id: 30 rack: null)], epoch=8}}.
2021-11-19 12:37:54.554 DEBUG 560 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 2 records
2021-11-19 12:37:55.570  INFO 560 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-MyListenerGroup-Testing-1, groupId=MyListenerGroup-Testing] Seeking to offset 7763 for partition some-topic-0
2021-11-19 12:37:55.578 ERROR 560 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 27
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:206) ~[spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2371) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2240) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2154) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2036) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1709) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1276) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1268) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163) [spring-kafka-2.7.9.jar:2.7.9]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_292]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_292]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_292]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 27
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2387) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkDeser(KafkaMessageListenerContainer.java:2434) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2302) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2229) [spring-kafka-2.7.9.jar:2.7.9]
    ... 9 common frames omitted
Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 27
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserializationException(ErrorHandlingDeserializer.java:216) ~[spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:191) ~[spring-kafka-2.7.9.jar:2.7.9]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1386) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1617) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.8.1.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_292]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_292]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_292]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_292]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-5.3.13.jar:5.3.13]
    at com.sun.proxy.$Proxy140.poll(Unknown Source) ~[na:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251) [spring-kafka-2.7.9.jar:2.7.9]
    ... 4 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 27
Caused by: org.apache.avro.AvroTypeException: Found com.foo.bar.CustomerRelationshipStateNew, expecting com.foo.bar.CustomerRelationshipStateNew, missing required field Status
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:309) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:128) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:239) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[avro-1.9.2.jar:1.9.2]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.9.2.jar:1.9.2]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:351) ~[kafka-avro-serializer-5.5.6.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:99) ~[kafka-avro-serializer-5.5.6.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:78) ~[kafka-avro-serializer-5.5.6.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-5.5.6.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.8.1.jar:na]
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:188) ~[spring-kafka-2.7.9.jar:2.7.9]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1386) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1617) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.8.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.8.1.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_292]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_292]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_292]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_292]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.13.jar:5.3.13]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-5.3.13.jar:5.3.13]
    at com.sun.proxy.$Proxy140.poll(Unknown Source) ~[na:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251) [spring-kafka-2.7.9.jar:2.7.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163) [spring-kafka-2.7.9.jar:2.7.9]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_292]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_292]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_292]

【问题讨论】:

  • 您能否显示一些日志,以确定究竟是哪个位置发出了不需要的堆栈跟踪?
  • @ArtemBilan 用堆栈跟踪更新了我原来的帖子。

标签: java spring spring-boot apache-kafka spring-kafka


【解决方案1】:

好的。我现在明白了。

所以,我建议你实现你自己的RemainingRecordsErrorHandler,并在这个方法中调用SeekToCurrentErrorHandler 委托super

public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
        Consumer<?, ?> consumer, MessageListenerContainer container) {

因此,您可以 catch 该委托的所有异常并以您需要的任何方式重新抛出它们,例如从KafkaExceptions 中删除那些堆栈跟踪。

【讨论】:

  • 这是您在我的错误处理程序实现中所推荐的吗? @Override public void handle(Exception thrownException, List&lt;ConsumerRecord&lt;?, ?&gt;&gt; records, Consumer&lt;?, ?&gt; consumer) { try { this.seekToCurrentErrorHandler.handle(thrownException, records, consumer); } catch (Exception e) { // Ignore } }
  • 好吧,结束。可能不是ignore,因为您仍然需要重新抛出一些东西才能让侦听器容器开心。
  • 您是否建议实现RemainingRecordsErrorHandlerContainerAwareErrorHandler,因为RemainingRecordsErrorHandler 在参数中没有带有container 的句柄方法,并且对SeekToCurrentErrorHandler.handle(Exception thrownException, @Nullable List&lt;ConsumerRecord&lt;?, ?&gt;&gt; records,Consumer&lt;?, ?&gt; consumer) 的任何调用都将转到ContainerAwareErrorHandler.handle(Exception thrownException, @Nullable List&lt;ConsumerRecord&lt;?, ?&gt;&gt; records, Consumer&lt;?, ?&gt; consumer),这将抛出@ 987654334@
  • 看到这个SeekToCurrentErrorHandler extends FailedRecordProcessor implements ContainerAwareErrorHandler。因此,您确实需要实现 ContainerAwareErrorHandler 才能正确委托。
  • 但不会在MyErrorHandler.handle 中重新抛出异常,让ListenerContainer 再次打印堆栈跟踪。
猜你喜欢
  • 1970-01-01
  • 2011-09-16
  • 2017-10-14
  • 1970-01-01
  • 1970-01-01
  • 2013-09-29
  • 2013-09-18
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多