【问题标题】:Spring boot kafka set ackOnError in application.propertiesSpring boot kafka 在 application.properties 中设置 ackOnError
【发布时间】:2019-07-16 15:47:08
【问题描述】:

有没有办法像其他监听器属性一样使用spring boot application.properties文件设置属性ackOnError=false:

spring.kafka.listener.ack-mode
spring.kafka.listener.ack-count
spring.kafka.listener.ack-time
spring.kafka.listener.poll-timeout

?

如果不可能,我如何组合猫:来自文件的属性 + java 配置?我不想像这样在 java-config 中设置所有 kafka 属性:

 @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        ......
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

我只想覆盖属性 ackOnError。 提前谢谢你。

【问题讨论】:

    标签: java spring spring-boot spring-kafka


    【解决方案1】:

    它不能作为属性使用,但您可以覆盖 Boot 的容器工厂 @Bean,如下所示...

    @Bean
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setAckOnError(false);
        return factory;
    }
    

    这也将应用所有其他启动属性。

    但是,除非您也停止容器(例如使用ContainerStoppingErrorHandler),否则此设置没有太多实用性。

    这是因为下一条成功的记录无论如何都会提交其偏移量,这超出了失败记录的偏移量。

    也就是说,在 2.3 中,默认为 false

    【讨论】: