【问题标题】:How to overload a KafkaListener method in Spring Boot如何在 Spring Boot 中重载 KafkaListener 方法
【发布时间】:2019-12-21 05:51:06
【问题描述】:
我使用 Spring Kafka 已经有一段时间了,但直到最近才遇到“重载”单个 Kafka 主题的需求。考虑下面的代码。
@KafkaListener(topics = "my-topic")
public void handleAsString(@Payload @Valid String message) {
...
}
@KafkaListener(topics = "my-topic")
public void handleAsUser(@Payload @Valid User user) {
...
}
这是使用@KafkaHandler 实现的最佳方式吗?我只在收到主题时执行的两种方法都成功,但希望将其视为标准重载方法。
【问题讨论】:
标签:
java
spring
spring-kafka
【解决方案1】:
一般来说,Kafka 的黄金法则是对一种类型的数据流使用一个topic。因此,如果您有不同类型的数据通过同一流进入,您可能需要重新考虑该方法并将不同类型的消息拆分到不同的 Kafka Topics 并为它们编写单独的 consumers。
如果您必须在一个主题中执行此操作,我会说将消息作为字符串接收,然后根据某些标准(例如每个说是否存在密钥)对其进行反序列化。
假设两条消息:
- “你好!” (字符串消息)
- "{"id": 1, "name": \"john\", "age": 26}"(序列化用户消息)
下面是一个示例
// maybe make a bean of this
private final ObjectMapper mapper = new ObjectMapper();
@KafkaListener(topics = "my-topic")
public void handleAsUser(@Payload String message) throws IOException {
if (message.contains("age")){
User userMessage = mapper.readValue(message, User.class);
// do something when a user message is received
return;
}
System.out.println("The message is of type String");
}