【问题标题】:How to set topic name in KafkaListener by code?如何通过代码在KafkaListener中设置主题名称?
【发布时间】:2020-03-28 06:28:55
【问题描述】:

我正在为 kafka 主题编写监听器,并使用 @kafkaListener 注释来做同样的事情。到目前为止,我对主题名称(topic1)进行了硬编码,并且工作正常。这是一个工作代码:-

@Component
public class KafkaConsumer {

@Autowired
private KafkaProperties kafkaProps;


@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;

@KafkaListener(topics = "topic1", containerFactory = "createPokafkaListenerContainerFactory")
public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
    throws JsonProcessingException {

    LOG.info("Received message for po create from kafka topic {} is {}",
        kafkaProps.getOmsCreateTopicName(), objectMapper
            .writerWithDefaultPrettyPrinter().writeValueAsString(po));

    ack.acknowledge();


}

}

现在,当我尝试更改代码以从代码中获取主题名称时,它不起作用。在堆栈溢出(How to pass dynamic topic name to @kafkalistener(topics from environment variable)中关注此页面后,我试图从代码中获取值的代码是:-

@Component
public class KafkaConsumer {

@Autowired
private KafkaProperties kafkaProps;

public KafkaProperties getKafkaProps() {
    return kafkaProps;
}

@Autowired
@Qualifier("CustomObjectMapper")
private ObjectMapper objectMapper;

@KafkaListener(topics = "#{__listener.kafkaProps.getOmsCreateTopicName()}", containerFactory = "omsCreatePokafkaListenerContainerFactory")
    public void CreatePoListener(PurchaseOrder po, Acknowledgment ack)
    throws JsonProcessingException {

    LOG.info("Received message for po create from kafka topic {} is {}",
        kafkaProps.getOmsCreateTopicName(), objectMapper
            .writerWithDefaultPrettyPrinter().writeValueAsString(po));

    ack.acknowledge();


}

}

当我试图带上服务器时,它会抛出错误:-

bean初始化失败;嵌套异常是 org.springframework.beans.factory.BeanExpressionException:表达式解析失败;嵌套异常是 org.springframework.expression.spel.SpelEvaluationException:EL1008E:在“org.springframework.beans.factory.config.BeanExpressionContext”类型的对象上找不到属性或字段“__listener” - 可能不公开?

有人可以帮我在这里做错什么吗?

【问题讨论】:

  • 你用的是什么版本?该功能是在 2.1.2 中添加的。
  • @GaryRussell 如果您询问 spring-kafka,我们使用的是 1.1.3.RELEASE。让我增加版本再试一次
  • 是的,我的意思是 spring-kafka;当前版本是 2.3.3。 1.1.x 很久不支持了。
  • @GaryRussell 因为这是一个很小的旧项目,我们使用的是 spring 4.3.x 版本。而且我猜 Spring 5 支持 2.1.2。由于 Spring 4 和 Spring-kafka-2.1.x 之间的兼容性问题,我没有找到 class def found 错误。有什么办法可以在春季 4 保持 spring-kafka 1.1.x 实现这一目标?

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

1.1.x 不再受支持;如果您无法升级到 Spring Framework 4.3 以上,则应升级到 1.3.10,这是与 Spring 4.3 一起使用的最新版本;得益于 KIP-62,它比 1.1.x 拥有更简单可靠的线程模型。

虽然您不能在 1.3.x 中使用 __listener,但您可以使用 SpEL 表达式直接从 KafkaProperties bean 获取主题:

@KafkaListener(topics = "#{kafkaProps.omsCreateTopicName}" ...)

【讨论】:

  • 谢谢加里。我进行了建议的更改(将 1.1.x 升级到 1.3.10)并如上所述使用 SpEl,但我收到此错误:-“bean 初始化失败;嵌套异常是 org.springframework.beans.factory.BeanExpressionException:表达式“
  • 鉴于您的问题中有@Autowired private KafkaProperties kafkaProps;,我假设您的应用程序上下文中有这样一个bean。如果它有不同的 bean 名称,则必须在表达式中使用正确的 bean 名称。
  • 我将变量名从 kafkaProps 更改为 kafkaProperties 并开始工作。非常感谢您的帮助。
【解决方案2】:

在 Consumer 类中,您需要进行以下更改:

@Autowired
public KafkaProperties kafkaProps;

@KafkaListener(topics = "#{kafkaConsumer.kafkaProps.getOmsCreateTopicName()}"

它对我有用。

【讨论】:

    猜你喜欢
    • 2012-09-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-09-22
    • 1970-01-01
    • 2011-06-05
    • 2017-12-22
    • 2020-01-17
    相关资源
    最近更新 更多