【问题标题】:How to create separate Kafka listener for each topic dynamically in springboot?如何在spring boot中为每个主题动态创建单独的Kafka监听器?
【发布时间】:2017-05-22 20:25:02
【问题描述】:
【问题讨论】:
标签:
spring
spring-mvc
spring-boot
kafka-consumer-api
spring-kafka
【解决方案1】:
如果您想使用注释指定Kafka 侦听器,则它们只是“设计时”。 Spring-kafka 也允许您动态创建它们,请参阅KafkaMessageListenerContainer。
动态创建的 Kafka 侦听器的最简单示例是:
Map<String, Object> consumerConfig = ImmutableMap.of(
BOOTSTRAP_SERVERS_CONFIG, "brokerAddress",
GROUP_ID_CONFIG, "groupId"
);
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(
consumerConfig,
new StringDeserializer(),
new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties("topicName");
containerProperties.setMessageListener((MessageListener<String, String>) record -> {
//do something with received record
}
ConcurrentMessageListenerContainer container =
new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
container.start();
更多解释和代码见这篇博文:http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/