【问题标题】:Inject ObjectMapper into Spring Kafka serialiser/deserialiser将 ObjectMapper 注入 Spring Kafka 序列化器/反序列化器
【发布时间】:2017-06-09 13:01:09
【问题描述】:

我正在使用 Spring Kafka 1.1.2-RELEASE 和 Spring Boot 1.5.0 RC,并且我配置了一个自定义值序列化器/反序列化器类扩展 org.springframework.kafka.support.serializer.JsonSerializer/org.springframework.kafka.support.serializer.JsonDeserializer。这些类确实使用了可以通过构造函数提供的 Jackson ObjectMapper。

是否有可能从我的 Spring 上下文中注入 ObjectMapper?我已经配置了一个 ObjectMapper,我想在序列化器/反序列化器中重用它。

【问题讨论】:

    标签: java spring spring-boot spring-kafka


    【解决方案1】:

    您可以将JsonSerializerJsonDeserializer 配置为@Beans。 向他们注入所需的ObjectMapper。并在 DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory bean 定义中使用这些 bean:

        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            DefaultKafkaProducerFactory<Integer, String> producerFactory = 
                    new DefaultKafkaProducerFactory<>(producerConfigs());
            producerFactory.setValueSerializer(jsonSerializer());
            return producerFactory;
        }
    

    【讨论】:

    • 谢谢,这将解决我的重复问题。但是,关于 Spring Boot 1.5 中 Kafka 的新自动配置,我想我可以使用 org.springframework.boot.autoconfigure.kafka.KafkaProperties#buildProducerProperties 来保持所有其他配置正常工作,但是绕过 spring.kafka.producer.value-serializer 的配置值是否仍然很奇怪?
    • 是的,您必须为您的自定义 bean 重用 buildProducerProper‌​ties()。不,.producer.value-serializer 用于类,由 Kafka 按需实例化。如果你想使用你自己的实例,你只能选择直接注入它。有关此事,请参阅KafkaProducer(Map&lt;String, Object&gt; configs, Serializer&lt;K&gt; keySerializer, Serializer&lt;V&gt; valueSerializer) API。
    【解决方案2】:
    @Component
    public class ObjectMapperProducerFactoryCustomizer implements DefaultKafkaProducerFactoryCustomizer {
    
        private final ObjectMapper objectMapper;
    
        public ObjectMapperProducerFactoryCustomizer(ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
        }
    
        @Override
        public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
            if (Objects.nonNull(producerFactory)) {
                producerFactory.setValueSerializer(new JsonSerializer<>(objectMapper));
            }
        }
    
    }
    

    【讨论】:

    • 请避免纯代码回答。至少解释一下为什么这种方式更好,或者它的潜点是什么。
    【解决方案3】:

    对于响应式非阻塞 kafka 客户端,配置如下:

    @Configuration
    @EnableConfigurationProperties(KafkaProperties.class)
    public class KafkaConfig {
    
        @Bean
        public ReactiveKafkaProducerTemplate<String, Object> kafkaProducerTemplate(KafkaProperties p,
                                                                                   ObjectMapper objectMapper) {
            return new ReactiveKafkaProducerTemplate<>(
                    SenderOptions.<String, Object>create(p.buildProducerProperties())
                            .withValueSerializer(new JsonSerializer<>(objectMapper)));
        }
    }
    

    以上暗示您正在使用io.projectreactor.kafka dep,

    dependencies {
        compile "io.projectreactor.kafka:reactor-kafka"
    }
    

    【讨论】:

      猜你喜欢
      • 2019-11-18
      • 2018-02-19
      • 1970-01-01
      • 2018-07-11
      • 1970-01-01
      • 2020-10-05
      • 1970-01-01
      • 2021-10-21
      • 1970-01-01
      相关资源
      最近更新 更多