【问题标题】:Create a Kafka topic if it does not already exist with Python-kafka admin client?如果 Python-kafka 管理客户端尚不存在 Kafka 主题,请创建它?
【发布时间】:2020-08-18 08:30:49
【问题描述】:

使用 Python-kafka 管理客户端,我知道如何创建和删除主题。

但是有没有办法检查该主题是否存在?如果不是那么只有我想创建它。 如何在 python -kafka 管理客户端中做到这一点? 我没有在文档中看到任何内容。

【问题讨论】:

  • 你可以使用kafka脚本吗? kafka-topic.sh --list
  • 我需要python脚本
  • Idk 但您可能可以从任何 python 脚本调用kafka-topic.sh --list,然后解析输出。或者更好地使用kafkaConsumer.topics() ref:kafka-python.readthedocs.io/en/1.0.2/apidoc/…
  • 我不确定如何在 python 脚本中调用 kafka-topic.sh --list。你能告诉我吗?

标签: python-3.x apache-kafka kafka-python


【解决方案1】:

创建一个KafkaConsumer object指定可选输入参数topics,然后在创建的对象上调用.topics()

【讨论】:

    【解决方案2】:

    如果您尝试创建已经存在的主题,则 Kafka 管理客户端会引发异常 TopicAlreadyExistsError,以便您可以像我一样在代码中处理该异常...

    让我为您提供我为我的用例构建的示例代码

    def create_topic(self, topic_name=None, num_partitions=1, replication_factor=1):
        try:
            topic_list = []
            topic_list.append(NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor))
            self.admin_client.create_topics(new_topics=topic_list, validate_only=False)
            self.show_topic_on_console()
            return True
        except TopicAlreadyExistsError as err:
            print(f"Request for topic creation is failed as {topic_name} is already created due to {err}")
            return False
        except Exception as err:
            print(f"Request for topic creation is failing due to {err}")
            return False
    

    替代流程 - 你可以像下面这样设计

    1. 提取主题列表
    2. 检查请求主题是否在列表中 与否
    3. 如果不在列表中,则创建主题

    【讨论】:

      猜你喜欢
      • 2018-10-11
      • 1970-01-01
      • 1970-01-01
      • 2022-10-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-05-13
      • 2018-11-27
      相关资源
      最近更新 更多