【问题标题】:Changing default container factory bean from kafkaListenerContainerFactory to my custom container factory将默认容器工厂 bean 从 kafkaListenerContainerFactory 更改为我的自定义容器工厂
【发布时间】:2026-02-05 08:55:02
【问题描述】:

在 spring-kafka 文档中,我们可以读到默认容器工厂被假定为可用的 bean 名称为 kafkaListenerContainerFactory,除非通过配置提供了明确的默认值。

我想问是否可以更改配置以使用我的自定义容器工厂 bean(例如。customKafkaListenerContainerFactory)而不是kafkaListenerContainerFactory

代码示例 -> 如果我们输入

@KafkaListener(id = "cat", topics = "myTopic")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

那么默认的 containerFactory bean 是 customKafkaListenerContainerFactory 而不是 kafkaListenerContainerFactory

更准确地说 -> 如果我不提供任何 containerFactory 属性,则使用 customKafkaListenerContainerFactory 而不是 kafkaListenerContainerFactory

【问题讨论】:

    标签: java spring spring-boot spring-kafka


    【解决方案1】:

    是的,您可以通过在@KafkaListener 注释中使用containerFactory 属性,您可以设置自定义kafka 容器工厂bean

    KafkaListenerContainerFactory 的 bean 名称,用于创建负责为该端点提供服务的消息侦听器容器。

    @KafkaListener(id = "cat", topics = "myTopic", containerFactory="customKafkaListenerContainerFactory")
    public void listen(String data, Acknowledgment ack) {
       ...
      ack.acknowledge();
    }
    

    或者您可以覆盖 Config 类中的默认 kafkaListenerContainerFactory。正如@Gary Russell 所说,只需使用相同的 bean 名称,它将替换 Boot's,条件是存在具有该名称的 bean

    @Configuration
    @EnableKafka
    public class Config {
    
       @Bean
       @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
       ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory,consumerFactory());
        // set custom properties
        return factory;
       }
    
       @Bean
       public ConsumerFactory<Integer, String> consumerFactory() {
          return new DefaultKafkaConsumerFactory<>(consumerConfigs());
       }
    
       @Bean
       public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
      }
    }
    

    【讨论】:

    • 我不想使用containerFactory 属性。我只想将默认 kafkaListenerContainerFactory 更改为customKafkaListenerContainerFactory。更准确地说 -> 如果我不提供任何 containerFactory 属性,则使用 customKafkaListenerContainerFactory 而不是 kafkaListenerContainerFactory
    • 你可以在配置类中覆盖@Syma
    • 使用相同的bean名称即可;它将替换 Boot 的条件,条件是存在名称为 @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") ConcurrentKafkaListenerContainerFactory&lt;?, ?&gt; kafkaListenerContainerFactory(...) 的 bean。除非特别设置,否则注释始终查找 kafkaListenerContainerFactory
    • @Deadpool - 最好在自定义 bean 定义中使用 ConcurrentKafkaListenerContainerFactoryConfigurer configurer,这样您就可以应用引导配置属性,然后设置其他属性。
    • 谢谢@GaryRussell 为我指出正确的方向,让我更新答案