【发布时间】:2026-02-23 07:30:01
【问题描述】:
我打算使用 spring kafka 批处理侦听器进行批处理。我正在为这 2 个场景寻找一些样本。
- 我们如何通过批处理实现过滤记录策略?更新:来自文档-“此外,还提供了 FilteringBatchMessageListenerAdapter,供您使用批处理消息侦听器时使用。”不清楚。我没有看到任何容器工厂方法来设置此 filterbatchmessagelisteneradapter 对象或过滤器实现。
这是我的批处理侦听器过滤策略代码:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecords) {
//log.info("Retrieved the record {} from the partition {} with offset {}", consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset());
return true;
}
});
return factory;
}
- 一旦我们在消费者中检索到这批消息并全部处理完毕,我们如何进行手动偏移提交。在批处理过程中,如果出现任何故障,只想将该消息推送到错误主题。但最后我想一次提交整个批处理。
现在我想到的另一个问题是上述场景如何与单个消费者和多个消费者一起工作。
假设案例 1:单一消费者
假设我们有一个包含 5 个分区的主题。当我们订阅该主题时,我们假设我们从该主题中获得了 100 条消息,其中每个分区有 20 条消息。如果我们要提交这些消息偏移量,确认对象是否保存了最后一条消息的每个分区和最后一个偏移量?
案例 2:多个消费者
使用与case1中提到的相同的输入,如果我们启用相等的消费者数量和分区计数,ack对象是否持有分区和最后一条消息的偏移量?
你能帮我解决这个问题吗?
【问题讨论】:
标签: spring-kafka