【问题标题】:kafka-python: I am unable to run the example producer codekafka-python:我无法运行示例生产者代码
【发布时间】:2021-04-13 10:02:19
【问题描述】:

我在 Google Cloud VM 上创建了 Kafka 集群 首先,我使用 cli 命令测试了我的代理以生成消息: 制作人:

$ kafka-console-producer.sh --broker-list localhost:9092 --producer.config /opt/bitnami/kafka/conf/producer.properties --topic lus_topic
>abc

消费者成功接收:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic lus_topic --consumer.config /opt/bitnami/kafka/conf/consumer.properties --from-beginning
abc

然后我尝试使用带有 cli 消费者的 kafka-python 生产者来检索主题

Python 3.7.3 (default, Jan 22 2021, 20:04:44) 
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:9092')        
>>> producer.send('lus_topic', b'Hello, World!').get(timeout=30)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/lumo_gftdevgcp_com/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 576, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/home/lumo_gftdevgcp_com/.local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 703, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

你能帮我解释一下为什么我会收到这个超时错误吗?如何调试此问题。

非常感谢

【问题讨论】:

  • 能否检查一下主题是否是在9092端口创建的?如果不是9092,一般会导致TimeoutError
  • 感谢@sotmot。我认同。正如我使用 cli kafka-console-producer.sh --broker-list localhost:9092 --producer.config /opt/bitnami/kafka/conf/producer.properties --topic lus_topic 测试过的那样
  • 运行 cli 工具时,在这两种情况下,您都通过 --producer.config--consumer.config 提供了额外的配置。我猜你在这些文件中有连接设置也需要传递给 Python 客户端
  • 谢谢@Mickael,我在配置中有以下设置:bootstrap.servers=localhost:9092 compression.type=none security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
  • 我尝试过 producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 0), security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN') 但还是一样错误(KafkaTimeoutError)

标签: python apache-kafka kafka-python


【解决方案1】:

我通过提供 sasl 用户名/密码解决了这个问题:

>>> producer = KafkaProducer(bootstrap_servers='localhost:9092',security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN', sasl_plain_username='user', sasl_plain_password='GGGGGG')
>>>producer.bootstrap_connected()
True
>>> producer.send('lus_topic', b'Hello, World!')
<kafka.producer.future.FutureRecordMetadata object at 0x7fe3eb8ebbe0>

【讨论】:

    猜你喜欢
    • 2013-03-02
    • 1970-01-01
    • 2016-06-11
    • 2018-08-04
    • 2018-07-26
    • 2020-06-25
    • 1970-01-01
    • 2017-01-26
    • 1970-01-01
    相关资源
    最近更新 更多