【发布时间】:2020-05-27 23:10:35
【问题描述】:
我正在使用 spring-kafka 2.2.6。我使用过 SeekToCurrentErrorHandler 和 ErrorHandlingDeserializer2。 SeekToCurrentErrorHandler 当前配置为在重试三次后记录消息。有没有办法跳过验证错误(由 Spring 中的 Validator 实现捕获)和消息转换错误的重试?所有错误都被容器错误处理程序(即 SeeToCurrentErrorHandler)拦截。是否应该重写 SeeToCurrentErrorHandler 的句柄方法?
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
factory.setAutoStartup(false);
factory.setErrorHandler(new SeekToCurrentErrorHandler((c, e) -> {
LOG.info(e.getMessage());
}, this.kafkaConfigProperties.getRetryCount()));
return factory;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> map = new HashMap<>();
Properties consumerProperties = getConsumerProperties();
consumerProperties.forEach((key, value) -> {
map.put((String) key, value);
});
KafkaSoapMessageConverter kafkaSoapMessageConverter = new KafkaSoapMessageConverter();
Map<String, Object> configMap = new HashMap<>(1);
configMap.put(KafkaSoapMessageConverter.CLASS_TO_DESERIALIZE, MyClass.class);
kafkaSoapMessageConverter.configure(configMap, false);
ErrorHandlingDeserializer2<Object> errorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
kafkaSoapMessageConverter);
DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(map);
consumerFactory.setValueDeserializer(errorHandlingDeserializer);
return consumerFactory;
}
编辑
我使用了下面的代码
if(DeserializationException.class == e.getClass()
|| e.getCause().getClass() == MethodArgumentNotValidException.class) {
SeekUtils.doSeeks(records, consumer, e, true, (c, e) -> { return true; }, LOG);
} else {
super.handle(e, records, consumer, container);
}
【问题讨论】:
标签: java spring-boot apache-kafka spring-kafka