【发布时间】:2019-03-16 21:17:19
【问题描述】:
我已成功建立了 Google Cloud Platform 集群上的 Kafka 生产者和消费者之间的连接:
$ cd /usr/lib/kafka
$ bin/kafka-console-producer.sh config/server.properties --broker-list \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test
在新的shell中执行
$ cd /usr/lib/kafka
$ bin/kafka-console-consumer.sh --bootstrap-server \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test \
--from-beginning
现在,我想使用以下 python 脚本向 Kafka 生产者服务器发送消息:
from kafka import *
topic = 'test'
producer = KafkaProducer(bootstrap_servers='PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092',
api_version=(0,10))
producer.send(topic, b"Test test test")
但是,这会导致KafkaTimeoutError:
"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
在网上环顾四周告诉我考虑:
- 在
/usr/lib/kafka/config/server.properties文件中取消注释listeners=...和advertised.listeners=...。
但是,listeners=PLAINTEXT://:9092 不起作用,this 帖子建议设置PLAINTEXT://<external-ip>:9092。
所以,我开始想知道如何通过 GCP 集群的外部(静态)IP 地址访问 Kafka 服务器。然后,我们设置了防火墙规则来访问端口(?)并允许 https 访问集群。但我不确定这是否是问题的过度杀伤力。
我肯定需要一些指导才能从 python 脚本成功连接到 Kafka 服务器。
【问题讨论】:
标签: python google-cloud-platform apache-kafka streaming