【问题标题】:Handling exception in Spring Kafka处理 Spring Kafka 中的异常
【发布时间】:2020-05-27 23:10:35
【问题描述】:

我正在使用 spring-kafka 2.2.6。我使用过 SeekToCurrentErrorHandler 和 ErrorHandlingDeserializer2。 SeekToCurrentErrorHandler 当前配置为在重试三次后记录消息。有没有办法跳过验证错误(由 Spring 中的 Validator 实现捕获)和消息转换错误的重试?所有错误都被容器错误处理程序(即 SeeToCurrentErrorHandler)拦截。是否应该重写 SeeToCurrentErrorHandler 的句柄方法?

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(false);
    factory.setErrorHandler(new SeekToCurrentErrorHandler((c, e) -> {
        LOG.info(e.getMessage());
    }, this.kafkaConfigProperties.getRetryCount()));
    return factory;
}

 @Bean
public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> map = new HashMap<>();
    Properties consumerProperties = getConsumerProperties();
    consumerProperties.forEach((key, value) -> {
        map.put((String) key, value);
    });
    KafkaSoapMessageConverter kafkaSoapMessageConverter = new KafkaSoapMessageConverter();
    Map<String, Object> configMap = new HashMap<>(1);
    configMap.put(KafkaSoapMessageConverter.CLASS_TO_DESERIALIZE, MyClass.class);
    kafkaSoapMessageConverter.configure(configMap, false);
    ErrorHandlingDeserializer2<Object> errorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
            kafkaSoapMessageConverter);
    DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(map);
    consumerFactory.setValueDeserializer(errorHandlingDeserializer);
    return consumerFactory;
}

编辑

我使用了下面的代码

if(DeserializationException.class == e.getClass() 
        || e.getCause().getClass() == MethodArgumentNotValidException.class) {
    SeekUtils.doSeeks(records, consumer, e, true, (c, e) -> { return true; }, LOG); 
} else {
    super.handle(e, records, consumer, container);
}

【问题讨论】:

    标签: java spring-boot apache-kafka spring-kafka


    【解决方案1】:

    2.3 版(当前为 2.3.5)增加了配置哪些异常可重试的功能:

    /**
     * Set an exception classifications to determine whether the exception should cause a retry
     * (until exhaustion) or not. If not, we go straight to the recoverer. By default,
     * the following exceptions will not be retried:
     * <ul>
     * <li>{@link DeserializationException}</li>
     * <li>{@link MessageConversionException}</li>
     * <li>{@link MethodArgumentResolutionException}</li>
     * <li>{@link NoSuchMethodException}</li>
     * <li>{@link ClassCastException}</li>
     * </ul>
     * All others will be retried.
     * When calling this method, the defaults will not be applied.
     * @param classifications the classifications.
     * @param defaultValue whether or not to retry non-matching exceptions.
     * @see BinaryExceptionClassifier#BinaryExceptionClassifier(Map, boolean)
     */
    public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
    

    这是默认设置的方式:

        Map<Class<? extends Throwable>, Boolean> classified = new HashMap<>();
        classified.put(DeserializationException.class, false);
        classified.put(MessageConversionException.class, false);
        classified.put(MethodArgumentResolutionException.class, false);
        classified.put(NoSuchMethodException.class, false);
        classified.put(ClassCastException.class, false);
    

    此外,您可以为默认设置添加例外:

    /**
     * Add an exception type to the default list; if and only if an external classifier
     * has not been provided. By default, the following exceptions will not be retried:
     * <ul>
     * <li>{@link DeserializationException}</li>
     * <li>{@link MessageConversionException}</li>
     * <li>{@link MethodArgumentResolutionException}</li>
     * <li>{@link NoSuchMethodException}</li>
     * <li>{@link ClassCastException}</li>
     * </ul>
     * All others will be retried.
     * @param exceptionType the exception type.
     * @see #removeNotRetryableException(Class)
     * @see #setClassifications(Map, boolean)
     */
    public void addNotRetryableException(Class<? extends Exception> exceptionType) {
        Assert.isTrue(this.classifier instanceof ExtendedBinaryExceptionClassifier,
                "Cannot add exception types to a supplied classifier");
        ((ExtendedBinaryExceptionClassifier) this.classifier).getClassified().put(exceptionType, false);
    }
    

    【讨论】:

    • 谢谢加里·拉塞尔。一旦我们组织中的框架开始发布 v2.3,我将采用这个框架。到那时我可以在 sn-p 下使用(覆盖 SeekToCurrentErrorHandler 的 hanlde 方法)还是你认为有更好的选择 if(DeserializationException.class== e.getClass() || e.getCause().getClass() == MethodArgumentNotValidException .class) { SeekUtils.doSeeks(records, consumer, e, true, (c, e) -> { return true; }, LOG); } else { super.handle(e, 记录, 消费者, 容器); }
    • 看起来还可以,但是cmets中的代码很难看懂;改为编辑问题并添加评论以说明您已这样做。
    猜你喜欢
    • 1970-01-01
    • 2021-04-12
    • 1970-01-01
    • 1970-01-01
    • 2019-02-17
    • 1970-01-01
    • 2015-10-08
    • 2021-11-12
    • 1970-01-01
    相关资源
    最近更新 更多