【问题标题】:Dont receive message with confluent kafka simple producer/consumer examples?没有收到带有融合 kafka 简单生产者/消费者示例的消息?
【发布时间】:2017-06-13 16:09:57
【问题描述】:

我运行kafka_2.11-0.10.1.1confluent-kafka-0.9.2(主分支)python 绑定,它使用librdkafka-0.9.2。我的机器运行 ubuntu-16.04 x86_64。我在端口2181 上运行zookeeper-3.4.8-1。我像这样运行融合 producer 示例:

$ cd confluent-kafka-python/examples
$ python producer.py localhost:9095 confluent-01
first message
2nd msg

消费者

$ python consumer.py localhost:9095 confluentgroup confluent-01

一切都在我的机器上本地运行,不运行任何防火墙。

备注

  • 主题在 Zookeeper 上创建成功
  • 代理成功接收到生产者消息:
  • 消费者设置以下conf {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}, 'api.version.request': True }
  • 一开始producer/consumer 工作了一段时间,直到我在生产者方面得到Receive failed: Disconnected。摘录:

$ python producer.py  localhost:9095 confluent-02
asd
% Message delivered to confluent-02 [0]
1234123
890890
% Message delivered to confluent-02 [0]
%3|1485791262.420|FAIL|rdkafka#producer-1| [thrd:obscura.ax.example.com:9095/3]: obscura.ax.example.com:9095/3: Receive failed: Disconnected

问题:一段时间后我在消费者方面没有得到任何东西

问题:

  1. 我做错了什么?
  2. 如何验证代理端已收到生产者消息? 生产者消息在代理端正确接收。
  3. 如何调试消费者端? 我将'debug': "cgrp, topic, fetch" 添加到消费者配置中。我在哪里可以阅读日志?

【问题讨论】:

  • 您可以尝试将 localhost 替换为您正在运行它的机器的实际 iphostname 吗?
  • @Gondola_Ride 代理成功地从生产者(从日志)获取消息。我的问题是消费者什么都不做。
  • @MatthiasJ.Sax librdkafka 向后兼容旧代理,所以应该没问题。
  • @m-ric 如果在消费者运行时产生新消息会发生什么?如果您收到新消息,则意味着该组已提交偏移量,因此它不会在分区的开头开始读取 (auto.offset.reset=smallest)。如果您没有收到新消息,则意味着其他错误:连接或分配。您可以通过将 debug 配置属性设置为 broker,cgrp,topic 来调试两者
  • 查看这些文档了解 kafka 消费者如何工作,注意已提交的偏移量和 auto.offset.reset:docs.confluent.io/3.1.2/clients/consumer.html

标签: python apache-kafka apache-zookeeper confluent-platform


【解决方案1】:

我有两个建议:

1) 尝试将选项 --from-beginning 添加到消费者命令

2) 代理的默认端口是 9092,因此请检查要使用的正确端口

希望这会有所帮助。

【讨论】:

  • 1) examples/consumer.py 已经定义了'auto.offset.reset': 'smallest' 2) 生产者在 localhost:9093 上成功发送消息
  • auto.oofset.reset 是 Kafka 在没有提交偏移量时应用的策略。您的消费者没有消费,因为它已经阅读了所有消息,因此 Kafka shell 中的选项 --from-beginning 告诉消费者从头开始消费。您可以更改消费者组以读取所有消息。
【解决方案2】:

我最终成功了。最初我运行confluent-kafka tutorial,其中:

  • 不捕获ctrl+c SIGINT 信号,
  • poll() 中时不会超时

在消费者代码中。因此我必须在我的 linux 机器上 ctrl+z 然后 kill %1 它。我相信这个终止并没有关闭套接字,它保持打开了一段时间(TIME_WAIT)。然后当我重新启动消费者时,它会从旧套接字中取出垃圾并卡住。

我添加了try: [...] except KeyboardInterrupt: consumer.close() 来捕获ctrl+c 并干净地关闭套接字。并且不再面临这个问题。

我希望这对将来的某人有所帮助。

【讨论】:

    猜你喜欢
    • 2017-11-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-12
    • 1970-01-01
    • 1970-01-01
    • 2020-11-15
    相关资源
    最近更新 更多