【发布时间】:2020-10-28 23:11:36
【问题描述】:
我已经在 MSK 上创建了一个 Kafka 集群,现在我正在尝试使用 python 连接到集群。
我写了这个短代码:
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['host1:9092', 'host2:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
api_version=(2, 4, 1)
)
producer.send('test', value={'hello':'world'})
问题是每次我运行它都会出现这个错误:
KafkaTimeoutError: Failed to update metadata after 60.0 secs.
我认为这可能与 Kafka 创建主题有关,因此我将这一行添加到配置中。
auto.create.topics.enable=true
但我仍然遇到同样的错误。
这是我的完整配置文件:
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
socket.request.max.bytes=104857600
unclean.leader.election.enable=true
auto.create.topics.enable=true
zookeeper.connection.timeout.ms=5000
我在这里缺少什么?我在某处读到可能与 SSL 身份验证有关的内容,但在任何步骤中,都没有任何 .pem 文件、.ca 文件或类似的文件。
【问题讨论】:
-
你能分享你的配置吗? (有兴趣看
advertised.listeners) -
我将配置添加到问题正文中。但我认为 AWS MSK 的全部意义在于它是托管的,我不必处理这种配置。
-
另外,添加
auto.create.topics.enable=true后是否重启了集群? -
我实际上是 MSK 的新手,但我相信它会自行重启,因为更新过程大约需要 5 到 10 分钟
-
每当我在 AWS 中遇到连接超时,我首先想到的是安全组。 MSK 集群的安全组是否允许从应用程序的安全组进入 tcp 端口 9092?
标签: python apache-kafka kafka-consumer-api kafka-producer-api aws-msk