【问题标题】:How to read specific data from message from kafka with Spring-kafka @KafkaListener?如何使用 Spring-kafka @KafkaListener 从来自 kafka 的消息中读取特定数据?
【发布时间】:2026-02-05 17:00:02
【问题描述】:
我的任务是读取来自多个不同主题的事件(所有主题中所有数据的类都是“事件”)。此类包含字段“数据”(Map),该字段携带特定于每个主题的数据,可以反序列化为特定的类(例如“DeviceCreateEvent”或smth。)。我可以使用@KafkaListener 在参数类型为“Event”的方法上为每个主题创建消费者。但在这种情况下,首先我必须 event.getData() 并将其反序列化为特定的类,因此我将在所有消费者方法中获得代码重复。有什么方法可以将带注释的消费者方法已经反序列化到特定的类?
【问题讨论】:
标签:
spring-boot
spring-kafka
【解决方案1】:
不清楚你在问什么。
如果每个主题/事件类型有不同的@KafkaListener,并且使用JSON,框架会自动告诉消息转换器数据应该转换成的类型;见the documentation。
虽然从低级 Kafka 消费者和生产者的角度来看,序列化器和反序列化器 API 非常简单和灵活,但在使用 @KafkaListener 或 Spring Integration 时,您可能需要在 Spring 消息传递级别上获得更大的灵活性。为了让您轻松地与 org.springframework.messaging.Message 相互转换,Spring for Apache Kafka 提供了 MessageConverter 抽象,其中包含 MessagingMessageConverter 实现及其 JsonMessageConverter(和子类)自定义。您可以直接将 MessageConverter 注入 KafkaTemplate 实例,并使用 @KafkaListener.containerFactory() 属性的 AbstractKafkaListenerContainerFactory bean 定义。以下示例显示了如何执行此操作:...
在消费者端,可以配置一个JsonMessageConverter;它可以处理 byte[]、Bytes 和 String 类型的 ConsumerRecord 值,因此应与 ByteArrayDeserializer、BytesDeserializer 或 StringDeserializer 结合使用。 (byte[] 和 Bytes 更有效,因为它们避免了不必要的 byte[] 到 String 的转换)。如果您愿意,还可以配置反序列化器对应的 JsonMessageConverter 的特定子类。