【发布时间】:2022-01-28 04:36:00
【问题描述】:
我遇到一种情况,其中 1 个线程正在快速消耗来自 Kafka 主题的消息,并将它们放入阻塞队列,然后在另一个线程中消耗,将批量插入写入 mongo 数据库集合。我没有看到很多答案,因为这是一个常见问题,我的应用程序崩溃了,因为消息 q 变得如此之大并且内存不足,因为 mongo db writer 线程无法跟上消息消耗率。
配置kafka消费者暂停消息消费一段时间直到消息q恢复到合理大小的正确方法是什么。我可以在泳池循环中暂停一下吗?我不这么认为,否则消费者将被标记为不在线,我是否可以在每次消息 q 变得太大时关闭 Kafka 消费者,然后在它恢复到可管理的大小时重新连接?我可以,但这似乎不是一个干净的解决方案,我正在寻找的是说“嘿,kafka,请暂停向我的活跃消费者发送消息,直到我告诉你恢复”,因为这将使我能够以最快的速度提取消息我可以将它们插入到我的数据存储中。
请帮忙!
【问题讨论】:
标签: java apache-kafka