【问题标题】:Kafka configuration for Spring Integration on Spring BootSpring Boot 上 Spring 集成的 Kafka 配置
【发布时间】:2018-02-20 12:30:23
【问题描述】:

我有一个工作原型 Spring Boot 应用程序,它侦听 Kafka 队列。除了application.yml 中的配置之外,只需要一个带有@KafkaListener 注释的MessageListener 实现即可。

我现在正在引入 Spring Integration,并为此配置了这些 bean:

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(receiver());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    // set more properties
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = ...; // set proerties
    return new DefaultKafkaConsumerFactory<>(props);
}

应用程序未启动,并抛出此错误:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory'


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

即使我已经定义了 ConsumerFactory bean。

在调试模式下运行,很明显 Boot 正在加载一个 KafkaListenerEndpointContainer bean 来监听代理:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator localhost:9092 (id: 2147483999 rack: null) for group my_group.

然后:

  KafkaAnnotationDrivenConfiguration matched:
      - @ConditionalOnClass found required class 'org.springframework.kafka.annotation.EnableKafka'; @ConditionalOnMissingClass did not find unwanted class (OnClassCondition)

   KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactory matched:
      - @ConditionalOnMissingBean (names: kafkaListenerContainerFactory; SearchStrategy: all) did not find any beans (OnBeanCondition)

   KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactoryConfigurer matched:
      - @ConditionalOnMissingBean (types: org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer; SearchStrategy: all) did not find any beans (OnBeanCondition)

   KafkaAnnotationDrivenConfiguration.EnableKafkaConfiguration matched:
      - @ConditionalOnMissingBean (names: org.springframework.kafka.config.internalKafkaListenerAnnotationProcessor; SearchStrategy: all) did not find any beans (OnBeanCondition)

   KafkaAutoConfiguration matched:
      - @ConditionalOnClass found required class 'org.springframework.kafka.core.KafkaTemplate'; @ConditionalOnMissingClass did not find unwanted class (OnClassCondition)

   KafkaAutoConfiguration#kafkaProducerFactory matched:
      - @ConditionalOnMissingBean (types: org.springframework.kafka.core.ProducerFactory; SearchStrategy: all) did not find any beans (OnBeanCondition)

   KafkaAutoConfiguration#kafkaProducerListener matched:
      - @ConditionalOnMissingBean (types: org.springframework.kafka.support.ProducerListener; SearchStrategy: all) did not find any beans (OnBeanCondition)

   KafkaAutoConfiguration#kafkaTemplate matched:
      - @ConditionalOnMissingBean (types: org.springframework.kafka.core.KafkaTemplate; SearchStrategy: all) did not find any beans (OnBeanCondition)

我认为正在发生的事情是 Spring Boot\Kafa 自动配置与 Spring Integration\Kafka 设置发生冲突。解决这个问题的正确方法是什么?

谢谢

【问题讨论】:

    标签: spring-boot spring-integration spring-kafka


    【解决方案1】:

    你可以使用 Boot 的 Consumer factory...

    @Bean
    public KafkaMessageListenerContainer<String, String> container(ConsumerFactory<String, String> cf) {
        ...
    }
    

    或者禁用kafka自动配置

    @SpringBootApplication(exclude = KafkaAutoConfiguration.class)
    

    【讨论】:

      猜你喜欢
      • 2021-08-02
      • 2019-07-14
      • 1970-01-01
      • 2016-12-22
      • 1970-01-01
      • 2019-08-10
      • 2023-03-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多