【问题标题】:Spring-Kafka Consumer KafkaListener cannot convert GenericMessage to Java ObjectSpring-Kafka Consumer KafkaListener 无法将 GenericMessage 转换为 Java 对象
【发布时间】:2017-12-20 01:45:07
【问题描述】:

我通过在 Centos7 实例上的 Confluent-3.3.0 平台上运行的 kafka-rest 服务发布一些自定义 Java 类型“InventoryEvent”的事件,使用以下两个步骤:

将 JSON 事件 POST 到 kafka-rest 的命令

curl -X POST -H "Content-Type:application/vnd.kafka.json.v2+json" --data '{"records" : [{"value" : {"id":1231, "eventType": "inventory.transaction", "qtyLevel" : 2223, "qtyReq" : 2345}}]}' "http://localhost:8082/topics/inventory"

为消费者实例订阅主题

curl -X POST -H "Content-Type:application/vnd.kafka.v2+json" --data '{"topics" : ["inventory"]}' http://localhost:8082/consumers/inventory_consumers/instances/consumer_1/subscription

接下来,我将通过 Spring-Kafka 应用程序使用发送到 Kafka 代理的事件,该应用程序应使用 JSON 并通过使用 @KafkaListener 注释的 Consumer 侦听器方法将其转换回 Java 类型,如下所示:

public class InventoryEventReceiver {

    private static final Logger log = LoggerFactory.getLogger(InventoryEventReceiver.class);

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {
        return latch;
    }

    @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory")
    public void listenWithHeaders(
            @Payload InventoryEvent event,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) String offset
            ) {

        System.out.println("EVENT HAS BEEN RECEIVED ");
        System.out.println(event.toString());


        ObjectMapper objectMapper = new ObjectMapper();
        String invEventInString = null;
        try {
            invEventInString = objectMapper.writeValueAsString(event);
            System.out.println(invEventInString);

        } catch (IOException e) {
            e.printStackTrace();
        } 

        latch.countDown();
    }
}

但我在 KafkaListenerContainer 中收到以下错误日志,同时尝试通过上述接收器代码使用消息

我尝试过但收到相同错误的其他侦听器方法定义是:

使用 InventoryEvent 对象监听

@KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory")
    public void listenWithHeaders(
            InventoryEvent event )

使用 ConsumerRecord 监听(从错误日志中获取提示)

@KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<?,?> record)

下面的我的接收器配置将 InventoryEvent 作为值占位符。我将其更改为 String 并通过

添加了 StringJsonMessageConverter
containerFactory.setMessageConverter(new StringJsonMessageConverter());

但它给出了同样的错误。

我是否缺少任何基本的 Spring-Kafka 配置,例如 MessageConverter 或 MessageListener,或者我是否必须完全实现自定义 MessageConverter 才能将 JSON 反序列化为 Java 类型的 InventoryEvent?

@EnableKafka
@Configuration
public class InventoryReceiverConfig {

    @Bean
    public static ConsumerFactory<String, InventoryEvent> consumerFactory() { 
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), 
                new JsonDeserializer<>(InventoryEvent.class));
    }

    @Bean
    public static ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(consumerFactory());
        containerFactory.setConcurrency(3); 
        containerFactory.getContainerProperties().setPollTimeout(3000);
        return containerFactory;
    }

    @Bean
    public static Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"inventory_consumers");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
        return consumerProps;
    }

    @Bean
    public InventoryEventReceiver receiver() {
        return new InventoryEventReceiver();
    }

}

错误日志:

2017-12-19 13:49:08.671 ERROR 16965 --- [fka-listener-23] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 48, CreateTime = 1513691348668, checksum = 537414172, serialized key size = -1, serialized value size = 77, key = null, value = {id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'})

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(java.lang.String,java.lang.String,java.lang.Integer,int,java.lang.String)]
Bean [com.psl.kafka.spring.InventoryEventReceiver@3ecc1b0b]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:156) ~[spring-kafka-1.1.1.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.1.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.1.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) [spring-kafka-1.1.1.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) [spring-kafka-1.1.1.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) [spring-kafka-1.1.1.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:981) [spring-kafka-1.1.1.RELEASE.jar:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}], failedMessage=GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
        ... 10 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.psl.kafka.spring.InventoryEvent] to [java.lang.String] for GenericMessage [payload={id=1231, eventType='inventory.transaction', qtyReq='2345', qtyLevel='2223'}, headers={kafka_offset=48, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
        at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.1.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:152) ~[spring-kafka-1.1.1.RELEASE.jar:na]
        ... 9 common frames omitted

2017-12-19 13:49:28.869  INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
2017-12-19 13:49:28.889  INFO 16965 --- [o-8080-exec-113] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 20 ms

【问题讨论】:

    标签: spring-boot apache-kafka spring-amqp spring-kafka


    【解决方案1】:

    查看您的堆栈跟踪:

    Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(java.lang.String,java.lang.String,java.lang.Integer,int,java.lang.String)]
    

    方法签名类似于listenWithHeaders(String, String, Integer, int, String)

    但你向我们展示了完全不同的一个。请务必确保在运行时使用正确的代码。

    如果你有JsonDeserializer,你真的不需要StringJsonMessageConverter,但必须使用正确的方法,确实......

    【讨论】:

    • 我已经展示了监听器方法签名,如堆栈跟踪中所示。在 InventoryEventReceiver 类中,我用 KafkaListener 注释了listenWithHeaders,它具有参数(String,String,Integer,int,String)作为 - InventoryEvent 事件,(KafkaHeaders.RECEIVED_TOPIC)字符串主题,(KafkaHeaders.RECEIVED_MESSAGE_KEY)整数键,(KafkaHeaders .RECEIVED_PARTITION_ID) int 分区,(KafkaHeaders.OFFSET) 字符串偏移量。
    • 另外,我给出了两个监听器方法签名,我尝试过同样的错误。我没有使用 StringJsonMessageConverter,因为我正在为消息中的值部分执行 JsonSerialize/JsonDeserialize
    • 通过正确的方法,您的意思是为 JsonDeserializer 自定义 MessageConverter 吗?如果是这样,你能举一个简短的例子吗?
    • 我检查了链接baeldung.com/spring-kafka,但在文章中没有提到任何转换器。它只是提到了处理自定义消息
    • GitHub 是一个很好的公共分享场所。但请记住:这应该尽可能简单,并且只关注问题。这对我们来说很难理解你的业务逻辑。谢谢
    猜你喜欢
    • 2018-11-24
    • 1970-01-01
    • 2018-07-08
    • 2019-07-02
    • 1970-01-01
    • 1970-01-01
    • 2015-06-11
    • 2021-02-17
    • 2021-03-10
    相关资源
    最近更新 更多