【发布时间】:2019-09-16 14:35:16
【问题描述】:
在 python 程序中,我想向 Kafka 写入一些消息,然后使用来自远程应用程序的相同数量的消息读取不同主题的响应。问题是,当我完成发送消息时,另一端已经开始响应,当我开始阅读时,我只收到消息批次的尾随部分,或者根本没有消息,具体取决于时间。这与我对包的理解相矛盾,即我认为如果我使用auto_offset_reset='latest' 创建一个消费者并订阅一个主题,那么它会记住订阅时的偏移量,当我迭代消费者对象时,它将开始从该偏移量读取消息。
这是我的工作:
我先创建一个消费者,然后订阅 out 主题:
consumer = KafkaConsumer(
bootstrap_servers=host+':'+broker_port,
group_id = "0",
auto_offset_reset='latest',
consumer_timeout_ms=10000
)
consumer.subscribe(topics=(topic_out))
然后创建一个生产者并向topic_in发送消息:
producer = KafkaProducer(
bootstrap_servers=host+':'+broker_port
)
future = producer.send(topic,json.dumps(record).encode('utf-8'))
future.get(timeout=5)
然后我开始从消费者那里阅读:
results = []
for msg in consumer:
message = json.loads(msg.value)
results.append(message)
我在发送前尝试了 consumer.pause(),发送后尝试了 consumer.resume() - 没有帮助。
是我在配置中遗漏了什么,还是我误解了 Consumer 的工作原理?
提前致谢!
【问题讨论】:
-
在生产者发送数据之前,您的消费者是否已启动并运行?如果我理解正确,你是说消费者没有消费这些消息
-
对不起,如果不清楚:C 和 P 都在同一个程序中:。我先实例化一个 C,然后是 P,然后用 P 发送消息,然后用之前实例化的 C 读取消息
-
然后改这个值
group_id = "0"然后重启进程
标签: apache-kafka kafka-consumer-api kafka-python