【问题标题】:Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to doubleSpring Kafka Consumer 将消息作为 LinkedHashMap 消费,因此自动将 BigDecimal 转换为 double
【发布时间】:2021-02-17 10:25:46
【问题描述】:

我正在使用基于注解的spring kafka监听器来消费kafka消息,代码如下

  1. 消费员工对象
Class Employee{
private String name;
private String address;
private Object account;
//getters
//setters
}
  1. Account 对象在运行时决定它是储蓄账户还是活期账户等。
Class SavingAcc{
private BigDecimal balance;

}
Class CurrentAcc{
private BigDecimal balance;
private BigDecimal limit;
}

  1. 储蓄账户和经常账户具有 BigDecimal 字段来存储余额。
  2. 因此,在从 Kafka 生产者发送 Employee 对象时,所有字段都正确映射并以正确的 BigDecimal 格式显示,等等。
  3. 但在另一个服务中使用 Employee 对象时,帐户对象显示为 LinkedHashMap 并且 BigDecimal 字段转换为 Double。这会导致问题。
  4. 据我了解,主要原因可以是 a) 将账户声明为对象类型而不是特定类型 b) 或者应该更具体地提供解串器。 [我已经将 Employee.class 指定为 kafka 接收器反序列化器的类型,因此 Employee 字段映射正确但帐户字段错误]。
@Bean
public ConsumerFactory<String, Employee> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Employee.class));
}

在如何映射或如何正确反序列化帐户字段方面需要帮助。

【问题讨论】:

    标签: spring-boot apache-kafka microservices kafka-consumer-api spring-kafka


    【解决方案1】:

    使用泛型和自定义JavaType method

    Class Employee<T> {
    private String name;
    private String address;
    private T account;
    //getters
    //setters
    }
    
    JavaType withCurrent = TypeFactory.defaultInstance().constructParametricType(Employee.class, CurrentAcc.class);
    
    JavaType withSaving = TypeFactory.defaultInstance().constructParametricType(Employee.class, SavingAcc.class);
    
    public static JavaType determineType(String topic, byte[] data, Headers headers) {
        // If it's a current account
            return withCurrent;
        // else 
            return withSaving;
    }
    

    如果您自己构建反序列化器,请使用

    deser.setTypeResolver(MyClass::determineType);
    

    使用属性进行配置时。

    spring.json.value.type.method=com.mycompany.MyCass.determineType
    

    您必须检查数据或标题(或主题)以确定您想要的类型。

    编辑

    这是一个完整的例子。在这种情况下,我在 Account 对象中传递了一个类型提示,但另一种方法是在生产者端设置一个标头。

    @SpringBootApplication
    public class JacksonApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(JacksonApplication.class, args);
        }
    
        @Data
        public static class Employee<T extends Account> {
            private String name;
            private T account;
        }
    
        @Data
        public static abstract class Account {
            private final String type;
            protected Account(String type) {
                this.type = type;
            }
        }
    
        @Data
        public static class CurrentAccount extends Account {
            private BigDecimal balance;
            private BigDecimal limit;
            public CurrentAccount() {
                super("C");
            }
        }
    
        @Data
        public static class SavingAccount extends Account {
            private BigDecimal balance;
            public SavingAccount() {
                super("S");
            }
        }
    
        @KafkaListener(id = "empListener", topics = "employees")
        public void listen(Employee<Account> e) {
            System.out.println(e);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("employees").partitions(1).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
            return args -> {
                Employee<CurrentAccount> emp1 = new Employee<>();
                emp1.setName("someOneWithACurrentAccount");
                CurrentAccount currentAccount = new CurrentAccount();
                currentAccount.setBalance(BigDecimal.ONE);
                currentAccount.setLimit(BigDecimal.TEN);
                emp1.setAccount(currentAccount);
                template.send("employees", emp1);
                Employee<SavingAccount> emp2 = new Employee<>();
                emp2.setName("someOneWithASavingAccount");
                SavingAccount savingAccount = new SavingAccount();
                savingAccount.setBalance(BigDecimal.ONE);
                emp2.setAccount(savingAccount);
                template.send("employees", emp2);
            };
        }
    
        private static final JavaType withCurrent = TypeFactory.defaultInstance()
                .constructParametricType(Employee.class, CurrentAccount.class);
    
        private static final JavaType withSaving = TypeFactory.defaultInstance()
                .constructParametricType(Employee.class, SavingAccount.class);
    
        public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
            if (JsonPath.read(new ByteArrayInputStream(data), "$.account.type").equals("C")) {
                return withCurrent;
            }
            else {
                return withSaving;
            }
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.JacksonApplication.determineType
    

    结果

    JacksonApplication.Employee(name=someOneWithACurrentAccount, account=JacksonApplication.CurrentAccount(balance=1, limit=10))
    JacksonApplication.Employee(name=someOneWithASavingAccount, account=JacksonApplication.SavingAccount(balance=1))
    

    POM

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.5.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>jackson</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>11</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </dependency>
            <dependency>
                <groupId>com.jayway.jsonpath</groupId>
                <artifactId>json-path</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    EDIT2

    这是一个在标题中传达类型提示的示例......

    @SpringBootApplication
    public class JacksonApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(JacksonApplication.class, args);
        }
    
        @Data
        public static class Employee<T extends Account> {
            private String name;
            private T account;
        }
    
        @Data
        public static abstract class Account {
        }
    
        @Data
        public static class CurrentAccount extends Account {
            private BigDecimal balance;
            private BigDecimal limit;
        }
    
        @Data
        public static class SavingAccount extends Account {
            private BigDecimal balance;
        }
    
        @KafkaListener(id = "empListener", topics = "employees")
        public void listen(Employee<Account> e) {
            System.out.println(e);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("employees").partitions(1).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
            return args -> {
                Employee<CurrentAccount> emp1 = new Employee<>();
                emp1.setName("someOneWithACurrentAccount");
                CurrentAccount currentAccount = new CurrentAccount();
                currentAccount.setBalance(BigDecimal.ONE);
                currentAccount.setLimit(BigDecimal.TEN);
                emp1.setAccount(currentAccount);
                template.send("employees", emp1);
                Employee<SavingAccount> emp2 = new Employee<>();
                emp2.setName("someOneWithASavingAccount");
                SavingAccount savingAccount = new SavingAccount();
                savingAccount.setBalance(BigDecimal.ONE);
                emp2.setAccount(savingAccount);
                template.send("employees", emp2);
            };
        }
    
        private static final JavaType withCurrent = TypeFactory.defaultInstance()
                .constructParametricType(Employee.class, CurrentAccount.class);
    
        private static final JavaType withSaving = TypeFactory.defaultInstance()
                .constructParametricType(Employee.class, SavingAccount.class);
    
        public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
            if (headers.lastHeader("accountType").value()[0] == 'C') {
                return withCurrent;
            }
            else {
                return withSaving;
            }
        }
    
        public static class MySerializer extends JsonSerializer<Employee<?>> {
    
            @Override
            public byte[] serialize(String topic, Headers headers, Employee<?> emp) {
                headers.add(new RecordHeader("accountType",
                        new byte[] { (byte) (emp.getAccount() instanceof CurrentAccount ? 'C' : 'S')}));
                return super.serialize(topic, headers, emp);
            }
    
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.producer.value-serializer=com.example.demo2.JacksonApplication.MySerializer
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo2.JacksonApplication.determineType
    

    【讨论】:

    • 谢谢@Gary,我在下面尝试过,但不知何故我无法访问子类属性。在设置帐户对象时将其转换为特定帐户类型。使用 lombok 注释。能否请您帮助```员工{私人帐户帐户; } Account { 私人字符串 accountNo; } SavingAccount 扩展 Account{ 私人 BigDecimal 余额; } CurrentAccount 扩展 Account{ 私人 BigDecimal 余额; } ```
    • 我添加了一个完整的例子。
    • 我添加了另一个在标头中传达类型提示的示例,从而避免使用 JsonPath 提取类型。
    【解决方案2】:

    这个注释解决了我的问题

    @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,include = JsonTypeInfo.As.PROPERTY,property = "@class")
    private T account 
    

    它将泛型的定义类绑定到字段

    【讨论】:

      猜你喜欢
      • 2021-02-20
      • 1970-01-01
      • 2019-12-06
      • 2018-10-21
      • 1970-01-01
      • 1970-01-01
      • 2020-02-22
      • 2023-01-27
      • 2022-06-15
      相关资源
      最近更新 更多