【发布时间】:2021-04-08 21:50:48
【问题描述】:
我有一个 Spring Boot 应用程序,它使用 Spring Kafka 监听 Kafka 主题。消息被消费后,会使用消息中的信息执行几个 web/rest 服务调用以收集一些其他数据,这个过程需要一些时间,正如预期的那样。因此,我使用了一个大小为 20 的线程池来创建并行消息处理。
该系统通常运行良好,但很少有大量消息(约 200K)在短时间内(1 秒)内放入/生成到 Kafka 主题。在这种情况下,消费者立即消费消息,但消息处理机制不够快。因此,所有消耗的消息在等待线程时都保留在内存中,并且应用程序会出现 OutOfMemoryError。
将线程池大小增加到某个点可能是一种改进,但它不是解决此问题的永久解决方案。我想在一段时间内消耗的消息数和处理的消息数之间建立平衡。这可以限制从 Kafka 主题消费的消息数量,或者在有可能立即处理消息时消费它。
是否有任何 Kafka 消费者配置来限制一段时间内的消息数量?在消息消费的延迟不成问题的情况下,如何优化消费和处理机制?
PS:似乎没有配置两次后续轮询之间的时间间隔(What is the delay time between each poll),如果存在,则可能有该配置的解决方案。
这是我的消费者代码:
@Autowired
MessageProcessUtil messageProcessUtil;
private ExecutorService executor = Executors.newFixedThreadPool(20);
@KafkaListener(topics = "${kafka.consumer.topicName}")
public void consume(String message){
logger.info(String.format("$$ -> Consumed Message -> %s",message));
messageProcessUtil.processMessage(message, executor);
}
消费者配置:
kafka.consumer.enable.auto.commit=true
kafka.consumer.auto.commit.interval.ms=1000
kafka.consumer.request.timeout.ms=40000
kafka.consumer.session.timeout.ms=30000
kafka.consumer.max.poll.records=1
kafka.consumer.fetch.max.wait.ms=500
kafka.consumer.auto.offset.reset=earliest
提前感谢您的帮助。
【问题讨论】:
标签: java spring-boot apache-kafka kafka-consumer-api spring-kafka