【发布时间】:2019-10-24 16:54:39
【问题描述】:
我正在使用注释 @KafkaListener 在我的应用程序中使用主题。我需要在运行时在已经运行的消费者中更改主题模式,以便可以使用与新模式匹配的新主题。
我尝试了以下代码,但它仍然使用与旧主题模式匹配的主题。在这里,我在应用程序启动时设置了“旧主题模式”。然后,我使用 Spring @Scheduler 每 10 秒将模式更新为“new-topic-pattern”。
Class "KafkaTopicPatternConfig.java":
@Configuration
public class KafkaTopicPatternConfig {
@Bean
public String kafkaTopicPattern(Environment env) {
logger.info("Getting kafka topic pattern");
String kafkaTopicPattern = "old-topic-pattern";
return kafkaTopicPattern;
}
}
Class "Consumer.java":
@Component
public class Consumer implements ConsumerSeekAware{
@Autowired
@Qualifier("kafkaTopicPattern")
private String kafkaTopicPattern;
@KafkaListener(topicPattern = "#{kafkaTopicPattern}", id = "s4federatorConsumer")
public void processMessage(@Payload ConsumerRecord<String, Object> record,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.CONSUMER) KafkaConsumer<String, String> consumer,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partitionId) {
//do something with the consumed message
}
@Scheduled(fixedDelay = 10000, initialDelay = 15000)
public void refreshKafkaTopics() {
logger.info("Inside scheduler to refresh kafka topics");
this.kafkaTopicPattern = "new-topic-pattern";
this.kafkaListenerEndpointRegistry.getListenerContainer("s4federatorConsumer").stop();
this.kafkaListenerEndpointRegistry.getListenerContainer("s4federatorConsumer").start();
}
}
【问题讨论】:
-
有人可以帮忙吗?是否可以在运行时更改 Kafka 主题模式?
标签: java spring apache-kafka kafka-consumer-api spring-kafka