【问题标题】:The correct way for creation of KafkaTemplate in spring bootspring boot中创建KafkaTemplate的正确方法
【发布时间】:2019-08-12 07:01:30
【问题描述】:

我尝试在 spring boot 应用程序中配置 apache kafka。我阅读了此documentation 并按照以下步骤操作:

1) 我将此行添加到aplication.yaml:

spring:
  kafka:
    bootstrap-servers: kafka_host:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

2) 我创建新主题:

    @Bean
    public NewTopic responseTopic() {
        return new NewTopic("new-topic", 5, (short) 1);
    }

现在我想使用KafkaTemplate:

private final KafkaTemplate<String, byte[]> kafkaTemplate;

public KafkaEventBus(KafkaTemplate<String, byte[]> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

但 Intellij IDE 亮点:

要解决这个问题,我需要创建 bean:

@Bean
public KafkaTemplate<String, byte[]> myMessageKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

并传递给构造函数属性greetingProducerFactory()

@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_hist4:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

但是,如果我需要创建 ProducerFactory 手册,那么在 application.yaml 中进行设置有什么意义呢?

【问题讨论】:

    标签: java spring spring-boot apache-kafka spring-kafka


    【解决方案1】:

    我认为您可以放心地忽略 IDEA 的警告;我在 Boot 的模板中使用不同的泛型类型连接没有问题...

    @SpringBootApplication
    public class So55280173Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So55280173Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template, Foo foo) {
            return args -> {
                template.send("so55280173", "foo");
                if (foo.template == template) {
                    System.out.println("they are the same");
                }
            };
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("so55280173", 1, (short) 1);
        }
    
    }
    
    @Component
    class Foo {
    
        final KafkaTemplate<String, String> template;
    
        @Autowired
        Foo(KafkaTemplate<String, String> template) {
            this.template = template;
        }
    
    }
    

    they are the same
    

    【讨论】:

    • 我收到错误消息:Failed to construct kafka producer 尝试发送消息时
    • 这与接线无关;我建议您提出一个新问题,提供更多信息、堆栈跟踪等。
    • 我混淆了密钥的序列化程序和反序列化程序类,因此出现错误Failed to construct kafka producer。现在可以了,但是IDEA还是显示warning(因为这个,我不是很懂,认为需要手动创建)
    • 听起来IDEA比spring更严格:(
    【解决方案2】:

    默认情况下,KafkaTemplate&lt;Object, Object&gt; 由 Spring Boot 在KafkaAutoConfiguration class 中创建。由于 Spring 在依赖注入期间考虑泛型类型信息,因此无法将默认 bean 自动装配到 KafkaTemplate&lt;String, byte[]&gt;

    【讨论】:

    • 我尝试注入不同的东西:KafkaTemplate&lt;Object, Object&gt;,只是KafkaTemplate - 在所有情况下都是亮点
    • KafkaTemplate 是原始类型,这会对您的课程产生副作用。最好使用KafkaTemplate&lt;?, ?&gt;KafkaTemplate&lt;Object, Object&gt;
    • Could not autowire. No beans of 'KafkaTemplate&lt;Object, Object&gt;' type found.
    • 您是否删除了自己的myMessageKafkaTemplate bean?
    • 我相信你可以忽略 IDEA 的警告——看我的回答。
    【解决方案3】:

    我一开始也遇到了同样的问题,但是当我执行时它没有出现任何错误并且工作正常。

    忽略 Intellij IDEA 的警告,这可能是 IDEA 的错误,无法识别自动装配的组件。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-10-04
      • 1970-01-01
      • 2018-11-07
      • 2017-07-23
      • 1970-01-01
      • 1970-01-01
      • 2020-04-02
      • 1970-01-01
      相关资源
      最近更新 更多