【发布时间】:2019-06-24 01:13:34
【问题描述】:
我正在尝试使用 spring-kafka 版本 2.3.0.M2 库编写 kafka 消费者。 为了处理运行时错误,我使用 SeekToCurrentErrorHandler.class 和 DeadLetterPublishingRecoverer 作为我的恢复器。这仅在我的消费者代码引发异常时才有效,但在无法反序列化消息时失败。
我尝试自己实现 ErrorHandler 并且我成功了,但是通过这种方法,我自己最终编写了 DLT 代码来处理我不想做的错误消息。
以下是我的 kafka 属性
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.mypackage
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), maxFailures));}
【问题讨论】:
-
请提供比“它失败”更多的信息。失败怎么办?
-
我的意思是当我以错误的格式发送消息并且程序突然停止时,它会引发反序列化异常。异常是预期的,但我希望它被 ErrorHandler 捕获并将消息按原样放在 DLT 主题中
-
框架中没有任何东西会“突然停止程序”。我会试着找点时间来测试一下。
-
请忽略“突然”这个词,它只是抛出异常但不会停止应用程序
标签: spring-kafka