【问题标题】:Add partitions for Kafka topic dynamically using Spring Boot?使用 Spring Boot 为 Kafka 主题动态添加分区?
【发布时间】:2023-02-04 03:17:56
【问题描述】:

我能够检查其分区的特定主题:

public  void addPartitionIfNotExists(int partitionId){
    Map<String, TopicDescription> games = kafkaAdmin.describeTopics("games");
    TopicDescription gamesTopicDescription = games.get("games");
    List<TopicPartitionInfo> partitionsInfo = gamesTopicDescription.partitions();
    boolean partitionIdExists = partitionsInfo.stream().anyMatch(partitionInfo -> partitionInfo.partition() == partitionId);
    if (!partitionIdExists){
        //missing part
    }
}

但是我无法在运行时将新分区添加到已经存在的主题中。不知道这是否可能。

【问题讨论】:

    标签: spring-boot apache-kafka spring-kafka


    【解决方案1】:

    有关详细信息,请参阅KafkaAdminOperationsJavadocs:

    /**
     * Create topics if they don't exist or increase the number of partitions if needed.
     * @param topics the topics.
     */
    void createOrModifyTopics(NewTopic... topics);
    

    不过,不确定你的逻辑是否围绕partitionIdExists,因为 Kafka 主题中的分区只是一个索引号。所以,如果有分区3,并不意味着没有分区12。因此,NewTopic API 与numPartitions 一样简单。而已。

    从技术上讲,您要问的只是 createOrModifyTopics() 涵盖的内容,仅此而已:您不需要自己检查主题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-01-11
      • 2022-01-13
      • 1970-01-01
      • 1970-01-01
      • 2020-01-25
      • 1970-01-01
      • 2022-01-02
      • 2016-10-01
      相关资源
      最近更新 更多