【问题标题】:Spring Kafka : Subscribe to a new Topic Pattern during RuntimeSpring Kafka:在运行时订阅新的主题模式
【发布时间】:2019-10-24 16:54:39
【问题描述】:

我正在使用注释 @KafkaListener 在我的应用程序中使用主题。我需要在运行时在已经运行的消费者中更改主题模式,以便可以使用与新模式匹配的新主题。

我尝试了以下代码,但它仍然使用与旧主题模式匹配的主题。在这里,我在应用程序启动时设置了“旧主题模式”。然后,我使用 Spring @Scheduler 每 10 秒将模式更新为“new-topic-pattern”。

Class "KafkaTopicPatternConfig.java":

@Configuration
public class KafkaTopicPatternConfig {

  @Bean
  public String kafkaTopicPattern(Environment env) {
    logger.info("Getting kafka topic pattern");
    String kafkaTopicPattern = "old-topic-pattern";
    return kafkaTopicPattern;
  }
}



Class "Consumer.java":

@Component
public class Consumer implements ConsumerSeekAware{

  @Autowired
  @Qualifier("kafkaTopicPattern")
  private String kafkaTopicPattern;


  @KafkaListener(topicPattern = "#{kafkaTopicPattern}", id = "s4federatorConsumer")
  public void processMessage(@Payload ConsumerRecord<String, Object> record,
        @Header(KafkaHeaders.OFFSET) Long offset,
        @Header(KafkaHeaders.CONSUMER) KafkaConsumer<String, String> consumer,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partitionId) {

        //do something with the consumed message

  }


  @Scheduled(fixedDelay = 10000, initialDelay = 15000)
  public void refreshKafkaTopics() {
    logger.info("Inside scheduler to refresh kafka topics");
    this.kafkaTopicPattern = "new-topic-pattern";
    this.kafkaListenerEndpointRegistry.getListenerContainer("s4federatorConsumer").stop();
    this.kafkaListenerEndpointRegistry.getListenerContainer("s4federatorConsumer").start();
  }
}

【问题讨论】:

  • 有人可以帮忙吗?是否可以在运行时更改 Kafka 主题模式?

标签: java spring apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

你得到 kafkaTopicPattern 为 -

@Qualifier("kafkaTopicPattern")
private String kafkaTopicPattern;

我看到你正在更新模式,比如 -

this.kafkaTopicPattern = "new-topic-pattern";

但是如果这两个在不同的实例对象中,则注入到侦听器中的“kafkaTopicPattern”的原始值不会被刷新。因此,您必须确保使用新模式刷新侦听器对象。

【讨论】:

  • 被注入的“kafkaTopicPattern”的值实际上被这段代码刷新了。我试图在“processMessage”方法中打印值,我得到了“kafkaTopicPattern”的旧值和新值。但是 Kafka 消费者在重启后仍然没有消费旧模式,尽管“kafkaTopicPattern”的值已经更新为新模式。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-12-09
  • 2018-01-19
  • 1970-01-01
  • 2020-06-22
  • 2021-04-26
  • 1970-01-01
相关资源
最近更新 更多