【问题标题】:Spring Kafka limiting number of messages to be consumed in a time periodSpring Kafka 限制一段时间内要消耗的消息数
【发布时间】:2021-04-08 21:50:48
【问题描述】:

我有一个 Spring Boot 应用程序,它使用 Spring Kafka 监听 Kafka 主题。消息被消费后,会使用消息中的信息执行几个 web/rest 服务调用以收集一些其他数据,这个过程需要一些时间,正如预期的那样。因此,我使用了一个大小为 20 的线程池来创建并行消息处理。

该系统通常运行良好,但很少有大量消息(约 200K)在短时间内(1 秒)内放入/生成到 Kafka 主题。在这种情况下,消费者立即消费消息,但消息处理机制不够快。因此,所有消耗的消息在等待线程时都保留在内存中,并且应用程序会出现 OutOfMemoryError。

将线程池大小增加到某个点可能是一种改进,但它不是解决此问题的永久解决方案。我想在一段时间内消耗的消息数和处理的消息数之间建立平衡。这可以限制从 Kafka 主题消费的消息数量,或者在有可能立即处理消息时消费它。

是否有任何 Kafka 消费者配置来限制一段时间内的消息数量?在消息消费的延迟不成问题的情况下,如何优化消费和处理机制?

PS:似乎没有配置两次后续轮询之间的时间间隔(What is the delay time between each poll),如果存在,则可能有该配置的解决方案。

这是我的消费者代码:

@Autowired
MessageProcessUtil messageProcessUtil;

private ExecutorService executor = Executors.newFixedThreadPool(20);

@KafkaListener(topics = "${kafka.consumer.topicName}")
public void consume(String message){
    logger.info(String.format("$$ -> Consumed Message -> %s",message));
    messageProcessUtil.processMessage(message, executor);
}

消费者配置:

kafka.consumer.enable.auto.commit=true
kafka.consumer.auto.commit.interval.ms=1000
kafka.consumer.request.timeout.ms=40000
kafka.consumer.session.timeout.ms=30000
kafka.consumer.max.poll.records=1
kafka.consumer.fetch.max.wait.ms=500
kafka.consumer.auto.offset.reset=earliest

提前感谢您的帮助。

【问题讨论】:

    标签: java spring-boot apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    我们有一个非常相似的要求,并使用了谷歌的Guava Framework 来实现速率限制。这个框架有不同的选项,比如限制每个时间段的请求数,或者允许的请求总数等。有一个很好的例子来说明如何使用它。

    Guava Rate Limiter tutorial

    【讨论】:

      【解决方案2】:

      添加对消费的Kafka记录的异步处理不是一个好主意;它会导致偏移管理问题;在@KafkaListener 上使用concurreny 来添加更多消费者(您至少需要在主题上进行那么多分区)。

      【讨论】:

        【解决方案3】:

        由于您将消息消费和消息处理分开,因此没有任何配置可以实现您想要的。

        但是您可以使用 BlockingQueue 来实现它。您设置 Queue 的最大数量,让拉线程将消息从 kafka 拉到队列,并让进程线程从队列中消费消息。当队列满时,拉取线程会阻塞并降低拉取率。

        【讨论】:

          【解决方案4】:

          Kafka消费者可以通过设置客户端配置fetch.max.bytes来选择要获取的最大字节数。

          更多详情请参考this链接。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2021-07-09
            • 1970-01-01
            • 2015-11-30
            • 1970-01-01
            • 1970-01-01
            • 2022-06-15
            相关资源
            最近更新 更多