【发布时间】:2021-04-08 21:13:36
【问题描述】:
在批处理模式下如何处理反序列化的异常?
我正在使用带有 spring boot -2.3.8 版本的 spring kafka。
试过这个选项:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
}
但它会抛出异常: 由:java.lang.IllegalStateException:错误处理程序必须是ErrorHandler,而不是org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler
这是我的应用程序属性:
spring.kafka.consumer.bootstrap-servers = localhost:9093
spring.kafka.consumer.enable-auto-commit = false
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.max-poll-records = 10
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.type = batch
spring.kafka.ack.discard = true
spring.kafka.listener.ack-mode = MANUAL
spring.kafka.listener.concurrency = 1
idle-between-polls = 120000
【问题讨论】:
标签: spring-kafka