【问题标题】:Python KafkaConsumer not connectingPython KafkaConsumer 未连接
【发布时间】:2020-05-14 11:35:34
【问题描述】:

设置:

我有 3 个码头集装箱

1) For Kafka
2) For Zookeeper
3) For JupyterLab

我在这些容器之间设置了网络,我看到 kafka 生产者能够运行并生成数据。

KafkaProducer.ipynb

KAFKA_BROKER = ['172.20.0.2:9093']
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER)

for _ in range(100):
    print("sending")
    producer.send('my-topic', key=b'foo', value=b'bar')
    print("success")

这里 send() 发送消息 100 次。

KafkaConsumer.ipynb

KAFKA_BROKER = ['172.20.0.2:9093']
from kafka import KafkaConsumer

consumer = KafkaConsumer('my-topic',group_id='my-group',bootstrap_servers=KAFKA_BROKER)

print("Comm success")

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

在上面的消费者代码中,print("Comm success") 行永远不会被执行。基于生产者代码执行,网络是开放的,jupyter 能够与 kafka 代理对话。但是,客户端无法连接到同一个代理进行数据消费。如何开始调试?

【问题讨论】:

    标签: docker apache-kafka jupyter-notebook kafka-consumer-api kafka-producer-api


    【解决方案1】:

    默认auto.offset.reset的值为latest,所以用新的group.id将它设置为earliest

    consumer = KafkaConsumer('my-topic',group_id='new-group',auto_offset_reset = 'earliest',bootstrap_servers=KAFKA_BROKER)
    

    【讨论】:

      猜你喜欢
      • 2020-12-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-09
      • 1970-01-01
      • 2021-10-28
      相关资源
      最近更新 更多