【发布时间】:2021-04-13 10:02:19
【问题描述】:
我在 Google Cloud VM 上创建了 Kafka 集群 首先,我使用 cli 命令测试了我的代理以生成消息: 制作人:
$ kafka-console-producer.sh --broker-list localhost:9092 --producer.config /opt/bitnami/kafka/conf/producer.properties --topic lus_topic
>abc
消费者成功接收:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic lus_topic --consumer.config /opt/bitnami/kafka/conf/consumer.properties --from-beginning
abc
然后我尝试使用带有 cli 消费者的 kafka-python 生产者来检索主题
Python 3.7.3 (default, Jan 22 2021, 20:04:44)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:9092')
>>> producer.send('lus_topic', b'Hello, World!').get(timeout=30)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/lumo_gftdevgcp_com/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 576, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/home/lumo_gftdevgcp_com/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 703, in _wait_on_metadata
"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
你能帮我解释一下为什么我会收到这个超时错误吗?如何调试此问题。
非常感谢
【问题讨论】:
-
能否检查一下主题是否是在9092端口创建的?如果不是9092,一般会导致
TimeoutError。 -
感谢@sotmot。我认同。正如我使用 cli kafka-console-producer.sh --broker-list localhost:9092 --producer.config /opt/bitnami/kafka/conf/producer.properties --topic lus_topic 测试过的那样
-
运行 cli 工具时,在这两种情况下,您都通过
--producer.config和--consumer.config提供了额外的配置。我猜你在这些文件中有连接设置也需要传递给 Python 客户端 -
谢谢@Mickael,我在配置中有以下设置:bootstrap.servers=localhost:9092 compression.type=none security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
-
我尝试过 producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 0), security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN') 但还是一样错误(KafkaTimeoutError)
标签: python apache-kafka kafka-python