【问题标题】:Spring Kafka: applying KafkaListenerErrorHandler to all KafkaListenerSpring Kafka:将 KafkaListenerErrorHandler 应用于所有 KafkaListener
【发布时间】:2021-02-02 21:07:15
【问题描述】:

我想拦截我的@KafkaListener 抛出的验证错误 (MethodArgumentNotValidException)。为了实现这一点,我创建了实现 KafkaListenerErrorHandler 的 DefaultKafkaValidationErrorHandler:

@Component
public class DefaultKafkaValidationErrorHandler implements KafkaListenerErrorHandler {


    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
       //here is some logic
    }

    @Override
    public final Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        //such as here
    }
}

并为注册商设置验证器:

@Component
public class KafkaListenerConfig implements KafkaListenerConfigurer {
    @Autowired
    private LocalValidatorFactoryBean validator;

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setValidator(this.validator);
    }

}

当我将这个处理程序直接放在我的监听器上时它工作正常

@KafkaListener(
            containerFactory = "factory",
            topics = "topic",
            groupId = "group.id",
            errorHandler = "defaultKafkaValidationErrorHandler")

我的问题是,有没有办法将我的 DefaultKafkaValidationErrorHandler 应用到我项目中的所有@KafkaListener,而不是直接像提到的那样。也许通过 ConcurrentKafkaListenerContainerFactory、KafkaListenerEndpointRegistrar 或其他方式

【问题讨论】:

    标签: java spring apache-kafka error-handling spring-kafka


    【解决方案1】:

    您可以覆盖ConcurrentKafkaListenerContainerFactoryC createListenerContainer(KafkaListenerEndpoint endpoint); 方法,并将您的全局KafkaListenerErrorHandler 设置为单个endpoint。您的工厂必须在 KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME bean 名称下注册,以避免在每个 @KafkaListener 上配置此工厂。

    【讨论】:

      猜你喜欢
      • 2019-08-24
      • 2019-09-16
      • 2021-12-28
      • 2018-06-27
      • 2019-01-20
      • 2020-10-03
      • 2017-11-30
      • 1970-01-01
      • 2020-02-08
      相关资源
      最近更新 更多