问题:springboot 是否总是需要创建 KafkaTemplate 类型的 bean?
下面的详细信息/堆栈跟踪/代码库,请告诉我我做错了什么。谢谢
- 我一直在向一个 Spring Boot 项目的主题发布消息
- 为了创建回调机制,我使用了 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) 来发送消息并创建回调
- 我这样做的原因是因为在使用 KafkaTemplate 时,listenablefuture 仅在失败时提供异常(并且我想在我的所有用例中将回调注册为单独的可重用类)
- 但是,当我没有定义 KafkaTemplate 类型的 bean 并出现以下错误时,spring 无法启动
原因:org.springframework.beans.factory.UnsatisfiedDependencyException:在类路径资源[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]中定义的名称为“kafkaTemplate”的bean创建错误:通过方法“kafkaTemplate”表达的依赖关系不满足参数0;嵌套异常是 org.springframework.beans.factory.NoSuchBeanDefinitionException:没有“org.springframework.kafka.core.ProducerFactory”类型的合格 bean 可用:预计至少有 1 个有资格作为自动装配候选者的 bean。依赖注释:{}
在 org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.(KafkaAnnotationDrivenConfiguration.java:90) ~[spring-boot-autoconfigure-2.4.12.jar:2.4.12]
在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:na]
在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na]
在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na]
在 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na]
在 org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12]
...省略了22个常用框架
原因:org.springframework.beans.factory.NoSuchBeanDefinitionException:没有可用的“org.springframework.kafka.core.ProducerFactory”类型的合格bean:预计至少有1个有资格作为自动装配候选者的bean。依赖注释:{}
在 org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12]
...省略了40个常用框架
我的 Kafka 配置如下
@Configuration
public class KafkaEventConfig {
private final KafkaProperties kafkaProperties;
@Value("${client.id}")
private String clientId;
@Value("${topic.movie.name}")
private String movieTopicName;
@Value("${retry.backoff.ms}")
private int retryBackoffMilliseconds;
@Value("${request.timeout.ms}")
private int requestTimeoutMilliseconds;
public KafkaEventConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ProducerFactory<String, Movie> producerFactory() {
Map<String, Object> props = kafkaProperties.buildProducerProperties();
populateCommonProperties(props);
return new DefaultKafkaProducerFactory<>(props);
}
private void populateCommonProperties(Map<String, Object> props) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
}
@Bean
public KafkaProducer<String, Movie> movieKafkaProducer() {
return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
}
@Bean
public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
MeterRegistry registry) {
return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
}
我的 Kafka 回调如下
@Slf4j
public class KafkaProducerCallBack<K, V> implements Callback {
private ProducerRecord<K, V> producerRecord;
public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) {
this.producerRecord = producerRecord;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
String topicName= metadata.topic();
long offset= metadata.offset();
if (exception != null) {
log.error("Failed to produce message [{}] to topic {} with exception {}", producerRecord, topicName, exception);
}
else {
log.info("Sucessfully published message [{}] to topic {} to offset {}", producerRecord, topicName , offset);
}
}
}
我是这样发布消息的
movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));
请注意我在 KafkaEventConfig 中添加以下行的那一刻一切正常
@Bean
public KafkaTemplate<String, Movie> movieKafkaTemplate() {
return new KafkaTemplate<String, Movie>(producerFactory());
}