【问题标题】:Spring kafka - Offset not setting on errorSpring kafka - 偏移量未设置错误
【发布时间】:2020-03-11 02:06:56
【问题描述】:

在我的监听器中消费消息后,如果发生任何异常,那么我会抛出一个异常。如果它成功,那么我承认。但即使抛出异常,偏移量也不会被设置。即重试没有按预期发生。错误事件不会再次出现。

我还看到我没有使用所有预期的消息。是不是我做错了什么?

@Bean
public ConcurrentKafkaListenerContainerFactory<String,Result<FormatException,String>> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, Result<FormatException, String>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(this.consumerFactory());
    factory.setErrorHandler(new SeekToCurrentErrorHandler(3));
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

    return factory;
}


public ConsumerFactory<String, Result<FormatException, String>> consumerFactory() {
   kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

监听类

@KafkaListener(topics = "${gpc.consumer.topic}")
    public void listenWithHeadersGPC(ConsumerRecord<String, Result<FormatException, String>> consumerRecord,
            Acknowledgment acknowledgment) {
        try {
            String body = consumerRecord.value().get();
            log.logInfo("Received message for " + GPCSourceName + ": " + body);
            log.logInfo("consumption successful from kafka. partition="+consumerRecord.partition()+", "
                    + "offset="+consumerRecord.offset());
                            if(body.contains("berlin+braze@gmail.com")) {
                                throw new RuntimeException("custom exception");
                            }
                            log.logInfo("Filtered message for " + GPCSourceName + " : " + body);
            acknowledgment.acknowledge();
        }catch(Exception e) {
            log.logError("Exception occurred in listenWithHeadersGPC: ", e);
            throw e;
        }
    }

如果没有例外,我会在我的听众中使用acknowledgment.acknowledge();

编辑:Spring Kafka 版本:2.2.8

【问题讨论】:

    标签: spring-boot spring-kafka


    【解决方案1】:

    factory.setErrorHandler(new SeekToCurrentErrorHandler(3));

    使用该配置,您应该会看到 3 次重试,它们之间的延迟为零;最后记录将被记录。 默认情况下,“恢复”记录的偏移量不会被提交。

    发布此类问题时;指明您使用的是哪个版本。

    从2.3版本开始,可以使用acknowledgment.nack(sleepTime);您也可以在SeekToCurrentErrorHandler 上调用setCommitRecovered(true) 并提交恢复记录的偏移量(使用MANUAL_IMMEDIATE)。

    这一切只有在向容器抛出异常时才会发生;如果您在侦听器中捕获异常,您将不会看到重试。

    如果您根本没有看到重试,并且您的侦听器抛出异常,则您的侦听器或配置有问题。显示所有配置和@KafkaListener 代码。

    【讨论】:

    • 感谢您的回复。抛出异常后,重试工作正常,但进入无限循环
    • 使用 new SeekToCurrentErrorHandler(3) 应该在 3 次交付尝试后放弃;其他事情正在发生。再次;编辑问题以显示所有相关代码。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-29
    • 1970-01-01
    • 2018-09-25
    • 2019-04-04
    • 1970-01-01
    • 2014-08-08
    相关资源
    最近更新 更多