【发布时间】:2017-12-01 11:12:57
【问题描述】:
我将 Spring Kafka 侦听器初始化为
@Bean
public Map<String, Object> consumerConfig() {
final HashMap<String, Object> result = new HashMap<>();
result.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
result.put(GROUP_ID_CONFIG, groupId);
result.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
result.put(VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaJacksonRulesExecutionResultsDeserializer.class);
return result;
}
@Bean
public ConsumerFactory<Long, MessageResult> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageResult> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Long, MessageResult> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setConcurrency(KAFKA_LISTENER_THREADS_COUNT);
containerFactory.getContainerProperties().setPollTimeout(KAFKA_LISTENER_POLL_TIMEOUT);
containerFactory.getContainerProperties().setAckOnError(true);
containerFactory.getContainerProperties().setAckMode(RECORD);
return containerFactory;
}
并用作
@KafkaListener(topics = "${spring.kafka.out-topic}")
public void processSrpResults(MessageResult result) {
反序列化程序在反序列化过程中抛出异常,导致无限循环,因为侦听器无法获取消息。
我怎样才能让 kafka 监听器犯错误?
【问题讨论】:
标签: spring apache-kafka