【问题标题】:Spring Kafka JsonDesirialization MessageConversionException failed to resolve class name Class not foundSpring Kafka JsonDesiralization MessageConversionException 无法解析类名 Class not found
【发布时间】:2019-07-08 11:40:50
【问题描述】:

我有两个服务应该通过Kafka 进行通信。 我们将第一个服务称为 WriteService,将第二个服务称为 QueryService

WriteService 方面,我为生产者提供了以下配置。

@Configuration
public class KafkaProducerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

我正在尝试发送com.example.project.web.routes.dto.RouteDto 类的对象

QueryService方面,消费者配置定义如下。

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.groupid}")
    private String serviceGroupId;

    @Value("${spring.kafka.consumer.trusted-packages}")
    private String trustedPackage;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

侦听器具有以下定义。有效负载类具有完全限定名称 - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto

@KafkaListener(topics = "${spring.kafka.topics.routes}",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
                               @Payload RouteDto payload) {
        logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
    }

    private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
    }

发送消息时,我收到以下错误

原因: org.springframework.messaging.converter.MessageConversionException: 无法解析类名。找不到类 [com.example.project.web.routes.dto.RouteDto];嵌套异常是 java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto

错误很清楚。但是我不明白为什么它默认有这种行为。我不希望在不同的服务中有相同的包,这根本没有意义。

我还没有找到一种方法来禁用它并使用提供给监听器的类,用@Payload注释

如何在不手动配置映射器的情况下解决这个问题?

【问题讨论】:

    标签: spring spring-boot apache-kafka json-deserialization spring-kafka


    【解决方案1】:

    如果您使用的是 spring-kafka-2.2.x,您可以通过 JsonDeserializer docs 的重载构造函数禁用默认标头

    从 2.2 版开始,您可以使用具有布尔值 useHeadersIfPresent(默认为 true)的重载构造函数之一显式配置反序列化程序以使用提供的目标类型并忽略标头中的类型信息。以下示例显示了如何执行此操作:

    DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
    

    如果使用较低版本,请使用MessageConverter(您可能会在 spring-kafka-2.1.x 及更高版本中看到此问题)

    Spring for Apache Kafka 提供 MessageConverter 抽象,其中包含 MessagingMessageConverter 实现及其 StringJsonMessageConverter 和 BytesJsonMessageConverter 自定义。您可以直接将 MessageConverter 注入 KafkaTemplate 实例,并使用 @KafkaListener.containerFactory() 属性的 AbstractKafkaListenerContainerFactory bean 定义。以下示例显示了如何执行此操作:

    @Bean
     public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
      }
    
      @KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
        public void jsonListener(RouteDto dto) {
         ...
       }
    

    注意:这种类型推断只有在方法级别声明了@KafkaListener注解时才能实现。使用类级别的@KafkaListener,payload 类型用于选择调用哪个@KafkaHandler 方法,因此必须已经转换后才能选择方法。

    【讨论】:

    • 感谢您的回答,是否可以在不明确提供具体类的情况下进行配置?所以解串器会从@Payload?中推断出一个类型?
    • 然后使用@CROSP帖子中添加的MessageConverter
    • 感谢您的完整解释,我对这个配置感到有些困惑,即使在后台使用Reflection 也需要额外的步骤。我用了Jackson,没有遇到这样的问题
    • 哪个版本的spring-kafka?我不想降级,但目前我正在使用 spring-kafka-2.0.5 没有任何问题
    猜你喜欢
    • 2018-08-22
    • 1970-01-01
    • 2018-12-09
    • 2017-05-26
    • 2011-06-15
    • 2017-08-04
    • 2018-10-10
    • 1970-01-01
    • 2018-10-02
    相关资源
    最近更新 更多