【问题标题】:How to consume and save a List of custom types in Spring Boot using Apache Kafka?如何使用 Apache Kafka 在 Spring Boot 中使用和保存自定义类型列表?
【发布时间】:2020-01-01 18:37:39
【问题描述】:

我正在编写一个 Kafka 生产者和消费者来发送我的自定义数据并在消费后保存。我做了所有必需的 Kafka 配置,但我得到了一个强制转换异常。

这些是代码:

消费者:

public class Consumer {
    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    @Autowired
    private RecordService service;

    @KafkaListener(topics = "data", groupId = "group_id", containerFactory = "kafkaListenerContainerFactory")
    public void consume(List<Record> message) {
        logger.info(String.format("$$ -> Consumed Message -> %s", "message"));

        System.out.println(message);
        service.saveAll(message);
    }
}

制作人:

public class Producer {
    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "data";
    @Autowired
    private KafkaTemplate<String, List<Record>> kafkaTemplate;

    public void sendMessage(List<Record> list) {
        logger.info(String.format("$$ -> Producing message --> %s", "message"));
        // Message<List<Record>> message =
        // MessageBuilder.withPayload(list).setHeader(KafkaHeaders.TOPIC,
        // TOPIC).build();

        this.kafkaTemplate.send(TOPIC, list);
    }
}

消费者的KafkaConfig

@Configuration
public class KafkaConfiguration {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, List<Record>> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(List.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, List<Record>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, List<Record>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

生产者的KafkaConfig:

public class KafkaConfiguration {

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, List<Record>> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, List<Record>> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

消息成功发送并打印到控制台,但类型不是我期望的。我发送了一个列表,但它作为 LinkedHashMap 的列表接收。存在堆栈跟踪错误:

2019-08-28 17:41:52.612  INFO 22236 --- [ntainer#0-0-C-1] com.consumerApp.models.Consumer          : $$ -> Consumed Message -> message
[{region=Central America and the Caribbean, country=Haiti, itemType=Office Supplies, salesChannel=Online, orderPriority=C, orderDate=1293739200000, orderId=485070693, shipDate=1296417600000, unitsSold=2052, unitPrice=651.21, unitCost=524.96, totalRevenue=1336282.92, totalCost=1077217.92, totalProfit=259065.0}, {region=Central America and the Caribbean, country=Nicaragua, itemType=Household, salesChannel=Online, orderPriority=C, orderDate=1445976000000, orderId=573998582, shipDate=1449432000000, unitsSold=7791, unitPrice=668.27, unitCost=502.54, totalRevenue=5206491.57, totalCost=3915289.14, totalProfit=1291202.43}]
2019-08-28 17:41:52.613 ERROR 22236 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = data, partition = 0, offset = 16, CreateTime = 1566999712608, serialized key size = -1, serialized value size = 675, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [{region=Central America and the Caribbean, country=Haiti, itemType=Office Supplies, salesChannel=Online, orderPriority=C, orderDate=1293739200000, orderId=485070693, shipDate=1296417600000, unitsSold=2052, unitPrice=651.21, unitCost=524.96, totalRevenue=1336282.92, totalCost=1077217.92, totalProfit=259065.0}, {region=Central America and the Caribbean, country=Nicaragua, itemType=Household, salesChannel=Online, orderPriority=C, orderDate=1445976000000, orderId=573998582, shipDate=1449432000000, unitsSold=7791, unitPrice=668.27, unitCost=502.54, totalRevenue=5206491.57, totalCost=3915289.14, totalProfit=1291202.43}])

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.consumerApp.models.Consumer.consume(java.util.List<com.consumerApp.models.Record>)' threw exception; nested exception is java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to com.consumerApp.models.Record; nested exception is java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to com.consumerApp.models.Record
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1261) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1188) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1159) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1099) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to com.consumerApp.models.Record
    at com.consumerApp.models.RecordService.saveAll(RecordService.java:15) ~[classes/:na]
    at com.consumerApp.models.Consumer.consume(Consumer.java:23) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    ... 8 common frames omitted

为什么它没有作为列表接收?有任何想法吗?谢谢。

【问题讨论】:

    标签: java spring-boot apache-kafka spring-kafka


    【解决方案1】:

    你得到的类型是List&lt;LinkedHashMap&gt;,因为你的反序列化器是JsonDeserializer&lt;&gt;(List.class),它的实际目标类型是Lis&lt;Object&gt;,最终jackson将其转换为List&lt;LinkedHashMap&gt;

    String str = "[{\"region\": \"Central America and the Caribbean\", \"country\":\"Haiti\", " +
                    "\"itemType\":\"Office Supplies\"}]";
    List list = new ObjectMapper().readValue(str, List.class);
    System.out.println(list.get(0).getClass());
    

    它打印class java.util.LinkedHashMap

    据我所知,您应该使用自定义反序列化器。

    static class CustomDeserializer extends JsonDeserializer<List<MyRecord>> {
    
            @Override
            public List<MyRecord> deserialize(String topic, Headers headers, byte[] data) {
                return deserialize(topic, data);
            }
    
            @Override
            public List<MyRecord> deserialize(String topic, byte[] data) {
                if (data == null) {
                    return null;
                }
                try {
                    return objectMapper.readValue(data, new TypeReference<List<MyRecord>>() {
                    });
                } catch (IOException e) {
                    throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
                            "] from topic [" + topic + "]", e);
                }
            }
        }
    
    

    【讨论】:

    • 感谢您的回答。这说得通。但奇怪的是,我不能在泛型中给出我自己的类型。也许可以将 List 定义为一个新类“SomeNewClass”,然后将其作为 new JsonDeserializer&lt;&gt;(SomeNewClass.class) 返回我想要的类型?
    • 很高兴它有帮助! Java泛型是通过类型擦除实现的,所以没有类型List&lt;Record&gt;,在序列化/反序列化时,我们无法获取列表中元素的实际类型。如果你定义一个像class Records extends ArrayList&lt;Record&gt;这样的类,它适用于新产生的消息,但如果你想读取旧消息,你应该像这样创建反序列化器:new JsonDeserializer&lt;&gt;(Records.class, false)
    • 我已经尝试了这两种方法,带有自定义反序列化器的方法效果很好,所以我将其标记为已解决。另一个对我不起作用,给出了这个错误:If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source我尝试了一个信任包的解决方案,仍然没有工作。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-11-25
    • 1970-01-01
    • 2020-02-22
    • 2020-07-11
    • 1970-01-01
    • 2018-11-05
    • 2019-06-26
    相关资源
    最近更新 更多