【发布时间】:2021-10-03 18:39:04
【问题描述】:
我正在测试一个 Spring Kafka 批处理侦听器,批处理确认模式,一次轮询 3 条记录并将这些记录保存到数据库中。当 Spring Boot 应用程序重新启动时,我看到另外 3 条记录正在被消耗和处理。
如果批量较大(500)并且在关闭过程完成之前无法将记录保存到数据库中怎么办?我们如何确保这些消息不会丢失或处理这种情况?
2021-09-21 22:37:22,448 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: processing batch size: 3, starting partition: 0, offset: 2797145
Disconnected from the target VM, address: '127.0.0.1:55506', transport: 'socket'
2021-09-21 22:38:02,388 WARN [HikariPool-1 housekeeper] com.zaxxer.hikari.pool.HikariPool$HouseKeeper: HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=48s84ms).
2021-09-21 22:38:02,393 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: batch update -> 39.943579103
2021-09-21 22:38:02,494 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: processing batch size: 3, starting partition: 0, offset: 2797148
2021-09-21 22:38:02,495 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: batch update -> 7.52304E-4
【问题讨论】:
标签: kafka-consumer-api spring-kafka