【发布时间】:2020-03-28 06:28:55
【问题描述】:
我正在为 kafka 主题编写监听器,并使用 @kafkaListener 注释来做同样的事情。到目前为止,我对主题名称(topic1)进行了硬编码,并且工作正常。这是一个工作代码:-
@Component
public class KafkaConsumer {
@Autowired
private KafkaProperties kafkaProps;
@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;
@KafkaListener(topics = "topic1", containerFactory = "createPokafkaListenerContainerFactory")
public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
throws JsonProcessingException {
LOG.info("Received message for po create from kafka topic {} is {}",
kafkaProps.getOmsCreateTopicName(), objectMapper
.writerWithDefaultPrettyPrinter().writeValueAsString(po));
ack.acknowledge();
}
}
现在,当我尝试更改代码以从代码中获取主题名称时,它不起作用。在堆栈溢出(How to pass dynamic topic name to @kafkalistener(topics from environment variable)中关注此页面后,我试图从代码中获取值的代码是:-
@Component
public class KafkaConsumer {
@Autowired
private KafkaProperties kafkaProps;
public KafkaProperties getKafkaProps() {
return kafkaProps;
}
@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;
@KafkaListener(topics = "#{__listener.kafkaProps.getOmsCreateTopicName()}", containerFactory = "omsCreatePokafkaListenerContainerFactory")
public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
throws JsonProcessingException {
LOG.info("Received message for po create from kafka topic {} is {}",
kafkaProps.getOmsCreateTopicName(), objectMapper
.writerWithDefaultPrettyPrinter().writeValueAsString(po));
ack.acknowledge();
}
}
当我试图带上服务器时,它会抛出错误:-
bean初始化失败;嵌套异常是 org.springframework.beans.factory.BeanExpressionException:表达式解析失败;嵌套异常是 org.springframework.expression.spel.SpelEvaluationException:EL1008E:在“org.springframework.beans.factory.config.BeanExpressionContext”类型的对象上找不到属性或字段“__listener” - 可能不公开?
有人可以帮我在这里做错什么吗?
【问题讨论】:
-
你用的是什么版本?该功能是在 2.1.2 中添加的。
-
@GaryRussell 如果您询问 spring-kafka,我们使用的是 1.1.3.RELEASE。让我增加版本再试一次
-
是的,我的意思是 spring-kafka;当前版本是 2.3.3。 1.1.x 很久不支持了。
-
@GaryRussell 因为这是一个很小的旧项目,我们使用的是 spring 4.3.x 版本。而且我猜 Spring 5 支持 2.1.2。由于 Spring 4 和 Spring-kafka-2.1.x 之间的兼容性问题,我没有找到 class def found 错误。有什么办法可以在春季 4 保持 spring-kafka 1.1.x 实现这一目标?
标签: apache-kafka kafka-consumer-api spring-kafka