【发布时间】:2022-02-24 23:50:20
【问题描述】:
我正在使用 confluent-kafka-python 包与 Kafka 服务器进行交互。我可以成功创建主题并将事件推送给它。但是,我的问题在于当我启动多个节点(在 Docker 中运行)时,如果第二个实例也尝试创建主题,我会收到错误消息。在创建新主题之前,我需要先检查主题是否已经存在。
from confluent_kafka.admin import AdminClient, NewTopic
kafka_admin = AdminClient({"bootstrap.servers": server})
# First check here if the topic already exists!
if not topic_exists(topic): # <-- how to accomplish this?
new_kafka_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
results = kafka_admin.create_topics([new_kafka_topic])
感谢您的帮助!
【问题讨论】:
标签: python apache-kafka confluent-platform confluent-kafka-python