【问题标题】:Spring-Kafka RetryableTopic causes RecordTooLargeExceptionSpring-Kafka RetryableTopic 导致 RecordTooLargeException
【发布时间】:2025-12-28 17:30:11
【问题描述】:

我正在使用 spring-kafka RetryableTopic 进行具有固定 BackOff 和单个重试主题的非阻塞重试 (https://docs.spring.io/spring-kafka/reference/html/#single-topic-fixed-delay-retries)

我注意到当重试次数相对较高时我得到RecordTooLargeException,并且在检查消息时我看到它包含所有先前尝试的 Kafka 标头,并且像 kafka_exception-stacktrace 这样的一些标头它们非常重。

为什么它会尝试发布带有先前重试标头的重试消息? 我找不到任何配置。 是否可以在发布前以某种方式操纵这些标头以剪切它们?

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    好点;我打开了一个问题。

    https://github.com/spring-projects/spring-kafka/issues/1994

    在实施之前,我会看看我是否能想出一个变通办法。

    编辑

    public class HeaderStrippingInterceptor<K, V> implements ProducerInterceptor<K, V> {
    
        @Override
        public void configure(Map<String, ?> configs) {
        }
    
        @Override
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
            Header header = record.headers().lastHeader(KafkaHeaders.EXCEPTION_STACKTRACE);
            if (header != null) {
                record.headers().remove(KafkaHeaders.EXCEPTION_STACKTRACE);
                record.headers().add(header);
            }
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        }
    
        @Override
        public void close() {
        }
    
    }
    

    然后将其类名添加到生产者配置中;例如,使用 Spring Boot:

    spring:
      kafka:
        producer:
          properties:
            "[interceptor.classes]": com.example.demo.HeaderStrippingInterceptor
    

    【讨论】:

    • 我添加了一个解决方法;查看编辑。