【问题标题】:Correct way of throttling kafka consumer messages in java在java中限制kafka消费者消息的正确方法
【发布时间】:2022-01-28 04:36:00
【问题描述】:

我遇到一种情况,其中 1 个线程正在快速消耗来自 Kafka 主题的消息,并将它们放入阻塞队列,然后在另一个线程中消耗,将批量插入写入 mongo 数据库集合。我没有看到很多答案,因为这是一个常见问题,我的应用程序崩溃了,因为消息 q 变得如此之大并且内存不足,因为 mongo db writer 线程无法跟上消息消耗率。

配置kafka消费者暂停消息消费一段时间直到消息q恢复到合理大小的正确方法是什么。我可以在泳池循环中暂停一下吗?我不这么认为,否则消费者将被标记为不在线,我是否可以在每次消息 q 变得太大时关闭 Kafka 消费者,然后在它恢复到可管理的大小时重新连接?我可以,但这似乎不是一个干净的解决方案,我正在寻找的是说“嘿,kafka,请暂停向我的活跃消费者发送消息,直到我告诉你恢复”,因为这将使我能够以最快的速度提取消息我可以将它们插入到我的数据存储中。

请帮忙!

【问题讨论】:

    标签: java apache-kafka


    【解决方案1】:

    kafka api中有一个暂停和恢复的方法 https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(java.util.Collection)

    如果您检查“消费流控制”部分,它会说明以下内容:

    Kafka 支持动态控制消费流,通过使用 pause(Collection) 和 resume(Collection) 在未来的 poll(long) 调用中分别暂停指定分配分区上的消费和恢复指定暂停分区上的消费。

    【讨论】:

    • 非常好,我所要做的就是查看 javadoc!谢谢
    猜你喜欢
    • 1970-01-01
    • 2017-09-23
    • 1970-01-01
    • 2018-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-21
    • 1970-01-01
    相关资源
    最近更新 更多