【问题标题】:Spring kafka Consume multiple Message types in one consumerSpring kafka 在一个消费者中消费多种消息类型
【发布时间】:2016-12-27 18:49:39
【问题描述】:

我有多个生产者可以向一个 kafka 主题发送多种类型的事件。

而且我有一个必须消费所有类型消息的消费者。每种类型的消息都有不同的逻辑。

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<EventOne> event) {
    logger.info("event={}", event);
}

但在这种情况下,所有消息都来到这个方法,不仅仅是 EventOne

如果我实现了两种方法(针对每种类型的消息),那么所有消息都只使用一种方法。

如果我这样实现监听器:

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<?> event) {
    logger.info("event={}", event);
}

然后我得到异常: org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-listener-1] 错误 org.springframework.kafka.listener.LoggingErrorHandler - 处理时出错:ConsumerRecord java.lang.IllegalArgumentException:无法识别的类型:[null]

请告诉我如何实现多类型消费者?

【问题讨论】:

    标签: java spring apache-kafka


    【解决方案1】:

    我找到了灵魂,它很简单。 所以,从问题中并不清楚,但我使用 jsonMessageConverter 像这样:

        @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setMessageConverter(new StringJsonMessageConverter());
        return factory;
    }
    

    默认情况下,Jackson 库不会为 JSON 添加类信息,因此它无法猜测我想要反序列化的类型。 解决方法是注解

    @JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include= JsonTypeInfo.As.PROPERTY, property="class")
    

    将类信息添加到 json 字符串。 在消费者方面,我只是编写我的事件的基类

        @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
    public void handleEvent(BasicEvent event) {
    
        if (event instanceof EventOne) {
            logger.info("type={EventOne}, event={}", event);
        } else {
            logger.info("type={EventTwo}, event={}", event);
        }
    }
    

    希望此信息对某人有所帮助。

    【讨论】:

      猜你喜欢
      • 2020-08-11
      • 1970-01-01
      • 2016-11-11
      • 2019-04-07
      • 2017-09-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-10
      相关资源
      最近更新 更多