【问题标题】:Handling a producer and consumer using Kafka-python使用 Kafka-python 处理生产者和消费者
【发布时间】:2019-12-31 05:28:11
【问题描述】:

首先我想说我是 Kafka 和 stackoverflow 的新手,所以如果我没有以正确的方式问这个问题,我很抱歉。 我正在尝试使用 kafka-python 实现生产者-消费者。 但它不能正常工作

我已经安装了 zookeeper 并启动并运行。我也有kafka-server。但是当我通过pycharm运行消费者和生产者时,接收者没有收到消息。消费者继续运行,但生产者停止。

consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('test', group_id='test-consumer-group', 
bootstrap_servers=['my_ip:9092'], api_version=(0, 10, 1),
                     auto_offset_reset='earliest')
print("Consuming messages")
for msg in consumer:
    print(msg)

生产者.py

from kafka import KafkaProducer

print('above producer')
producer = KafkaProducer(bootstrap_servers=['my_ip:9092'], api_version=(0, 10, 1),
                         compression_type=None
                         )
print('after producer')
for _ in range(100):
    producer.send('test', b'HELLO NITHIN chandran')

print('after sending messages')

在 my_ip 的位置,我从 ipconfig 提供了我的系统 IP 地址。

consumer.py 输出 -

Consuming messages

consumer.py 不会停止运行

producer.py 输出 -

above producer
after producer
after sending messages

Process finished with exit code 0

producer.py 停止运行,进程完成,如输出所示。

请帮我解决这个问题。 感谢所有帮助

【问题讨论】:

  • 欢迎来到 Stackoverflow,你能在 Kafka 的数据目录中查看主题的数据吗?请检查主题中是否有任何内容?
  • 我该怎么做? Kafka 中没有数据目录。只有 bin,config,libs,site-docs,LICENSE,NOTICE。
  • 您应该查看config/server.properties 中的配置文件。可能在/tmp 目录中。
  • 你能不能说得更具体一点。比如我应该在config/server.properties 文件中寻找什么
  • 当然。在这个文件中有一个log.dirs 配置。它指定存储主题数据的目录。在该目录中,有许多以您的主题名称命名的文件夹。找到您的主题并打开日志文件。

标签: python apache-kafka apache-zookeeper broker kafka-python


【解决方案1】:

您的代码没问题,问题在于您的代理配置。请将其设置为初始配置,只需将log.dirs更改为您要存储Kafka数据的路径即可。 更改配置文件后,请按照以下步骤操作:

  • 停止 zookeeper 和 kafka
  • 清除 kafka 和 zookeeper 数据目录
  • 运行 zookeeper 和 kafa
  • 启动消费者和生产者

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-01-07
    • 2019-01-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-08
    • 2019-10-16
    相关资源
    最近更新 更多