【发布时间】:2020-03-11 02:06:56
【问题描述】:
在我的监听器中消费消息后,如果发生任何异常,那么我会抛出一个异常。如果它成功,那么我承认。但即使抛出异常,偏移量也不会被设置。即重试没有按预期发生。错误事件不会再次出现。
我还看到我没有使用所有预期的消息。是不是我做错了什么?
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Result<FormatException,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Result<FormatException, String>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler(3));
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
public ConsumerFactory<String, Result<FormatException, String>> consumerFactory() {
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
监听类
@KafkaListener(topics = "${gpc.consumer.topic}")
public void listenWithHeadersGPC(ConsumerRecord<String, Result<FormatException, String>> consumerRecord,
Acknowledgment acknowledgment) {
try {
String body = consumerRecord.value().get();
log.logInfo("Received message for " + GPCSourceName + ": " + body);
log.logInfo("consumption successful from kafka. partition="+consumerRecord.partition()+", "
+ "offset="+consumerRecord.offset());
if(body.contains("berlin+braze@gmail.com")) {
throw new RuntimeException("custom exception");
}
log.logInfo("Filtered message for " + GPCSourceName + " : " + body);
acknowledgment.acknowledge();
}catch(Exception e) {
log.logError("Exception occurred in listenWithHeadersGPC: ", e);
throw e;
}
}
如果没有例外,我会在我的听众中使用acknowledgment.acknowledge();
编辑:Spring Kafka 版本:2.2.8
【问题讨论】: