【问题标题】:How to determine if a kafka topic exists using confluent-kafka-python如何使用 confluent-kafka-python 确定是否存在 kafka 主题
【发布时间】:2022-02-24 23:50:20
【问题描述】:

我正在使用 confluent-kafka-python 包与 Kafka 服务器进行交互。我可以成功创建主题并将事件推送给它。但是,我的问题在于当我启动多个节点(在 Docker 中运行)时,如果第二个实例也尝试创建主题,我会收到错误消息。在创建新主题之前,我需要先检查主题是否已经存在。

from confluent_kafka.admin import AdminClient, NewTopic
kafka_admin = AdminClient({"bootstrap.servers": server})

# First check here if the topic already exists!
if not topic_exists(topic):  # <-- how to accomplish this?
    new_kafka_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
    results = kafka_admin.create_topics([new_kafka_topic])

感谢您的帮助!

【问题讨论】:

    标签: python apache-kafka confluent-platform confluent-kafka-python


    【解决方案1】:

    我遇到了同样的问题,我通过以下方式解决了它:

    client = AdminClient({"bootstrap.servers": BROKER_URL})
    topic_metadata = client.list_topics()
    if topic_metadata.topics.get(self.topic_name) is None:
      self.create_topic()
    

    【讨论】:

    • 什么是self.create_topic()
    【解决方案2】:

    AdminClient 类的 list_topics 方法允许传递您要检查的主题名称,因此您无需阅读现有主题的完整(可能很大)列表:

    list_topics([topic=None][, timeout=-1])
    

    文档:https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#id0

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-07-21
      • 2015-09-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-12
      • 2019-09-23
      相关资源
      最近更新 更多