【问题标题】:Examples on spring kafka batch processing with filter strategy and manual commit使用过滤策略和手动提交的spring kafka批处理示例
【发布时间】:2026-02-23 07:30:01
【问题描述】:

我打算使用 spring kafka 批处理侦听器进行批处理。我正在为这 2 个场景寻找一些样本。

  1. 我们如何通过批处理实现过滤记录策略?更新:来自文档-“此外,还提供了 FilteringBatchMessageListenerAdapter,供您使用批处理消息侦听器时使用。”不清楚。我没有看到任何容器工厂方法来设置此 filterbatchmessagelisteneradapter 对象或过滤器实现。

这是我的批处理侦听器过滤策略代码:

@Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setBatchListener(true);
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
            @Override
            public boolean filter(ConsumerRecord<Object, Object> consumerRecords) {

                //log.info("Retrieved the record {} from the partition {} with offset {}", consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset());

                return true;
            }
        });
        
        return factory;
    }
  1. 一旦我们在消费者中检索到这批消息并全部处理完毕,我们如何进行手动偏移提交。在批处理过程中,如果出现任何故障,只想将该消息推送到错误主题。但最后我想一次提交整个批处理。

现在我想到的另一个问题是上述场景如何与单个消费者和多个消费者一起工作。

假设案例 1:单一消费者

假设我们有一个包含 5 个分区的主题。当我们订阅该主题时,我们假设我们从该主题中获得了 100 条消息,其中每个分区有 20 条消息。如果我们要提交这些消息偏移量,确认对象是否保存了最后一条消息的每个分区和最后一个偏移量?

案例 2:多个消费者

使用与case1中提到的相同的输入,如果我们启用相等的消费者数量和分区计数,ack对象是否持有分区和最后一条消息的偏移量?

你能帮我解决这个问题吗?

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:
    1. FilteringBatchMessageListenerAdapterhttps://docs.spring.io/spring-kafka/docs/current/reference/html/#filtering-messages

    2. 使用批处理处理异常的最简单方法是使用RecoveringBatchErrorHandlerDeadLetterPublishingRecoverer。抛出BatchListenerFailedException,表示批次中哪条记录失败;成功记录的偏移量被提交,剩余的记录(包括失败的记录)将被重新传递,直到重试(如果配置)用尽,失败的记录将进入死信主题,其余的将被重新传递。

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh

    是的,当批次被确认时,批次中每个分区的最新偏移量 (+1) 将被提交。

    如果您有多个消费者,则分区将分布在这些消费者之间。

    【讨论】:

    • 感谢加里的回复。我修改了2个问题。对于过滤策略,您能否提供示例代码如何实现过滤批处理消息侦听器适配器?我没有看到任何容器工厂方法来设置这个 filterbatchmessagelisteneradapter 对象。
    • 只要给容器工厂添加过滤策略containerFactory.setRecordFilterStrategy();框架将连接使用该策略配置的过滤适配器。
    • 这会处理过滤器批处理标准吗? factory.setBatchListener(true); factory.setAckDiscarded(true); factory.setRecordFilterStrategy(new RecordFilterStrategy() { @Override public boolean filter(ConsumerRecord consumerRecord) { //log.info("Apply filter criteria on the record {} ", consumerRecord.val( )); 返回 true; } });
    • 是的;那是对的。不要将代码放在 cmets 中;太难读了;改为编辑问题并评论您已这样做。
    • 我怀疑容器如何知道它是过滤一批记录还是单个记录,并在内部使用 filterbatchmessagelisteneradapter 对象?是否基于 factory.setBatchListener(true); factory.setAckDiscarded(true);价值观?