【问题标题】:Confluent_kafka Producer does not publish messages into topicConfluent_kafka Producer 不向主题发布消息
【发布时间】:2020-04-29 00:19:25
【问题描述】:

我尝试在我的 Raspberry 上安装 Kafka。并在“hello-kafka”主题上进行测试:

~ $ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello-kafka
>Test message 1
>Test message 2
>Test message 3
>^Z
[4]+  Stopped                 /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello-kafka
$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello-kafka --from-beginning
Test message 1
Test message 2
Test message 3
^CProcessed a total of 3 messages

然后我尝试检查服务器是否在另一台机器上工作。 检查动物园管理员:

(venv)$ telnet 192.168.1.10 2181
Trying 192.168.1.10...
Connected to 192.168.1.10.
Escape character is '^]'.
srvr
Zookeeper version: 3.6.0--b4c89dc7f6083829e18fae6e446907ae0b1f22d7, built on 02/25/2020 14:38 GMT
Latency min/avg/max: 0/0.8736/59
Received: 10146
Sent: 10149
Connections: 2
Outstanding: 0
Zxid: 0x96
Mode: standalone
Node count: 139
Connection closed by foreign host.

还有卡夫卡:

(venv) $ telnet 192.168.1.10 9092
Trying 192.168.1.10...
Connected to 192.168.1.10.
Escape character is '^]'.
tets
Connection closed by foreign host.

然后我写了一个 Python 脚本:

# -*- coding: utf-8 -*-

from confluent_kafka import Producer


def callback(err, msg):
    if err is not None:
        print(f'Failed to deliver message: {str(msg)}: {str(err)}')
    else:
        print(f'Message produced: {str(msg)}')


config = {
            'bootstrap.servers': '192.168.1.10:9092'
        }

producer = Producer(config)
producer.produce('hello-kafka', value=b"Hello from Python", callback=callback)
producer.poll(5)

有脚本输出(没有任何打印):

(venv) $ python kafka-producer.py 
(venv) $ python kafka-producer.py 
(venv) $ 

Kafka 中没有新消息:

$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello-kafka --from-beginning
Test message 1
Test message 2
Test message 3
^CProcessed a total of 3 messages
$ ^C

谁能告诉我我做错了什么?

【问题讨论】:

    标签: python apache-kafka confluent-platform


    【解决方案1】:

    正确的解决方法是更新server.properties 中的代理配置,以正确设置广告侦听器。如果您的客户端无法解析raspberrypi,则将广告侦听器更改为您的客户端可以访问的内容,即IP地址:

    advertised.listeners=PLAINTEXT://192.168.1.10:9092
    

    在您的客户端上更改 /etc/hosts 文件是一种解决方法,对于使用 Raspberry Pi 的测试项目来说是可以的,但不鼓励作为一般的最佳实践(因为客户端一旦移动到另一台机器上就会中断哪个没有/etc/hosts hack)

    【讨论】:

    • 谢谢你,罗宾!我在您的解释后进行了更改,现在可以正常工作了。
    【解决方案2】:

    我打开日志并看到下一条消息:

    WARNING:kafka.conn:DNS lookup failed for raspberrypi:9092, exception was [Errno 8] nodename nor servname provided, or not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
    ERROR:kafka.conn:DNS lookup failed for raspberrypi:9092 (AddressFamily.AF_UNSPEC)
    

    然后我在客户端机器上的 /etc/hosts 中添加了下一个字符串:

    192.168.1.10 raspberrypi
    

    它完全解决了这种情况。

    【讨论】:

      猜你喜欢
      • 2019-02-14
      • 2020-08-18
      • 2018-10-27
      • 2020-07-08
      • 1970-01-01
      • 2023-03-17
      • 1970-01-01
      • 1970-01-01
      • 2019-11-05
      相关资源
      最近更新 更多