【问题标题】:Spring Kafka Batch Listener Application RestartSpring Kafka Batch Listener 应用重启
【发布时间】:2021-10-03 18:39:04
【问题描述】:

我正在测试一个 Spring Kafka 批处理侦听器,批处理确认模式,一次轮询 3 条记录并将这些记录保存到数据库中。当 Spring Boot 应用程序重新启动时,我看到另外 3 条记录正在被消耗和处理。

如果批量较大(500)并且在关闭过程完成之前无法将记录保存到数据库中怎么办?我们如何确保这些消息不会丢失或处理这种情况?

2021-09-21 22:37:22,448 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]  com.example.demo.kafka.CassandraConsumer:  processing batch size: 3, starting partition: 0, offset: 2797145
Disconnected from the target VM, address: '127.0.0.1:55506', transport: 'socket'
2021-09-21 22:38:02,388 WARN  [HikariPool-1 housekeeper]  com.zaxxer.hikari.pool.HikariPool$HouseKeeper: HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=48s84ms).
2021-09-21 22:38:02,393 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]  com.example.demo.kafka.CassandraConsumer: batch update -> 39.943579103
2021-09-21 22:38:02,494 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]  com.example.demo.kafka.CassandraConsumer:  processing batch size: 3, starting partition: 0, offset: 2797148
2021-09-21 22:38:02,495 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]  com.example.demo.kafka.CassandraConsumer: batch update -> 7.52304E-4

【问题讨论】:

    标签: kafka-consumer-api spring-kafka


    【解决方案1】:

    除非侦听器正常退出,否则不会提交偏移量 - 只要您使用具有默认错误处理程序 (SeekToCurrentBatchErrorHandler) 的最新受支持版本。

    【讨论】:

      猜你喜欢
      • 2018-10-26
      • 1970-01-01
      • 2017-08-20
      • 1970-01-01
      • 1970-01-01
      • 2016-12-19
      • 2017-07-23
      • 1970-01-01
      • 2023-03-24
      相关资源
      最近更新 更多