【问题标题】:How to overload a KafkaListener method in Spring Boot如何在 Spring Boot 中重载 KafkaListener 方法
【发布时间】:2019-12-21 05:51:06
【问题描述】:

我使用 Spring Kafka 已经有一段时间了,但直到最近才遇到“重载”单个 Kafka 主题的需求。考虑下面的代码。

@KafkaListener(topics = "my-topic")
public void handleAsString(@Payload @Valid String message) {
    ...
}

@KafkaListener(topics = "my-topic")
public void handleAsUser(@Payload @Valid User user) {
    ...
}

这是使用@KafkaHandler 实现的最佳方式吗?我只在收到主题时执行的两种方法都成功,但希望将其视为标准重载方法。

【问题讨论】:

    标签: java spring spring-kafka


    【解决方案1】:

    一般来说,Kafka 的黄金法则是对一种类型的数据流使用一个topic。因此,如果您有不同类型的数据通过同一流进入,您可能需要重新考虑该方法并将不同类型的消息拆分到不同的 Kafka Topics 并为它们编写单独的 consumers

    如果您必须在一个主题中执行此操作,我会说将消息作为字符串接收,然后根据某些标准(例如每个说是否存在密钥)对其进行反序列化。

    假设两条消息:

    • “你好!” (字符串消息)
    • "{"id": 1, "name": \"john\", "age": 26}"(序列化用户消息)

    下面是一个示例

    // maybe make a bean of this
    private final ObjectMapper mapper = new ObjectMapper();
    
    @KafkaListener(topics = "my-topic")
    public void handleAsUser(@Payload String message) throws IOException {
    
        if (message.contains("age")){
    
            User userMessage = mapper.readValue(message, User.class);
    
            // do something when a user message is received
            return;          
        }
        System.out.println("The message is of type String");
    }
    

    【讨论】:

      猜你喜欢
      • 2019-09-05
      • 2022-01-15
      • 1970-01-01
      • 2019-01-20
      • 2018-12-21
      • 2022-08-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多