【问题标题】:python KafkaConsumer and Producer at the same timepython同时使用KafkaConsumer和Producer
【发布时间】:2019-09-16 14:35:16
【问题描述】:

在 python 程序中,我想向 Kafka 写入一些消息,然后使用来自远程应用程序的相同数量的消息读取不同主题的响应。问题是,当我完成发送消息时,另一端已经开始响应,当我开始阅读时,我只收到消息批次的尾随部分,或者根本没有消息,具体取决于时间。这与我对包的理解相矛盾,即我认为如果我使用auto_offset_reset='latest' 创建一个消费者并订阅一个主题,那么它会记住订阅时的偏移量,当我迭代消费者对象时,它将开始从该偏移量读取消息。

这是我的工作:

我先创建一个消费者,然后订阅 out 主题:

consumer = KafkaConsumer(
  bootstrap_servers=host+':'+broker_port, 
  group_id = "0", 
  auto_offset_reset='latest',
  consumer_timeout_ms=10000
)
consumer.subscribe(topics=(topic_out))

然后创建一个生产者并向topic_in发送消息:

producer = KafkaProducer(
   bootstrap_servers=host+':'+broker_port
)
future = producer.send(topic,json.dumps(record).encode('utf-8'))
future.get(timeout=5)

然后我开始从消费者那里阅读:

results = []
for msg in consumer:
   message = json.loads(msg.value)
   results.append(message)

我在发送前尝试了 consumer.pause(),发送后尝试了 consumer.resume() - 没有帮助。

是我在配置中遗漏了什么,还是我误解了 Consumer 的工作原理?

提前致谢!

【问题讨论】:

  • 在生产者发送数据之前,您的消费者是否已启动并运行?如果我理解正确,你是说消费者没有消费这些消息
  • 对不起,如果不清楚:C 和 P 都在同一个程序中:。我先实例化一个 C,然后是 P,然后用 P 发送消息,然后用之前实例化的 C 读取消息
  • 然后改这个值group_id = "0"然后重启进程

标签: apache-kafka kafka-consumer-api kafka-python


【解决方案1】:

听起来你有竞争条件。

一种解决方案是存储通过使用此“查找主题”构建的本地字典或 sqlite 表,然后当您从“操作主题”中使用时,您是在本地进行查找,而不是启动消费者来扫描您需要的数据的“查找主题”。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-02-09
    • 2020-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多