【发布时间】:2020-03-11 10:05:45
【问题描述】:
我的应用程序正在监听多个 kafka 主题。现在,我的监听器如下所示,属性文件中的my.topics 包含逗号分隔的 kafka 主题列表
@KafkaListener(topics = ["#{'\${my.topics}'.split(',')}"], groupId = "my.group", containerFactory = "myKafkaFactory")
fun genericMessageListener(myRequest: MyRequest, ack: Acknowledgment) {
//do Something with myRequest
ack.acknowledge()
}
我的ConcurrentKafkaListenerContainerFactory 是
@Bean
fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> {
val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
return factory
}
有没有一种方法可以为每个主题动态创建一个单独的消费者,这样当我在my.topics 的列表中添加一个主题时,spring 会自动为该主题创建一个单独的消费者。
我现在面临的问题是,如果任何主题中的一条消息出现问题,其他主题中的消息也会受到影响。
在高层次上,我正在寻找类似的东西
@Bean
fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> {
val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
factory.isOneConsumerPerTopic(true)
return factory
}
这样factory.isOneConsumerPerTopic(true) 将确保为数组中的每个主题创建一个单独的消费者。
我确实通过了How to create separate Kafka listener for each topic dynamically in springboot?。我正在寻找更“清洁”的解决方案。 :)
【问题讨论】:
标签: spring spring-boot kotlin apache-kafka spring-kafka