【问题标题】:Spring Kafka listener infinite loop on errorSpring Kafka监听器无限循环出错
【发布时间】:2017-12-01 11:12:57
【问题描述】:

我将 Spring Kafka 侦听器初始化为

@Bean
public Map<String, Object> consumerConfig() {
    final HashMap<String, Object> result = new HashMap<>();
    result.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    result.put(GROUP_ID_CONFIG, groupId);
    result.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    result.put(VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaJacksonRulesExecutionResultsDeserializer.class);
    return result;
}

@Bean
public ConsumerFactory<Long, MessageResult> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageResult> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Long, MessageResult> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConsumerFactory(consumerFactory());
    containerFactory.setConcurrency(KAFKA_LISTENER_THREADS_COUNT);
    containerFactory.getContainerProperties().setPollTimeout(KAFKA_LISTENER_POLL_TIMEOUT);
    containerFactory.getContainerProperties().setAckOnError(true);
    containerFactory.getContainerProperties().setAckMode(RECORD);
    return containerFactory;
}

并用作

@KafkaListener(topics = "${spring.kafka.out-topic}")
public void processSrpResults(MessageResult result) {

反序列化程序在反序列化过程中抛出异常,导致无限循环,因为侦听器无法获取消息。

我怎样才能让 kafka 监听器犯错误?

【问题讨论】:

    标签: spring apache-kafka


    【解决方案1】:

    我为引发异常的反序列化器创建了一个子类。然后我在我的配置中使用它作为反序列化器。然后你的处理器必须处理空对象。

    public class MyErrorHandlingDeserializer extends ExceptionThrowingDeserializer {
    
        @Override
        public Object deserialize(String topic, byte[] data) {
            try {
                return super.deserialize(topic, data);
            } catch (Exception e) {
                log.error("Problem deserializing data " + new String(data) + " on topic " + topic, e);
                return null;
            }
        }
    }
    

    【讨论】:

      【解决方案2】:

      您可以在使用者配置中使用org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 作为值反序列化器。这使您有机会优雅地处理反序列化错误,并允许您提交消费者记录。用法和更多细节可以在https://docs.spring.io/spring-kafka/reference/html/#error-handling-deserializer找到。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-08-16
        • 1970-01-01
        • 2022-01-03
        • 1970-01-01
        • 1970-01-01
        • 2020-10-03
        • 1970-01-01
        • 2021-03-11
        相关资源
        最近更新 更多