【问题标题】:Cannot convert from [java.lang.String] to [com.example.demo.User]无法从 [java.lang.String] 转换为 [com.example.demo.User]
【发布时间】:2021-05-22 02:26:19
【问题描述】:

我正在研究 Spring Boot 和 Apache Kafka - 尝试使用用户定义的配置 -

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.demo.Consumer.consume(com.example.demo.User) throws java.io.IOException]
Bean [com.example.demo.Consumer@7cd4a8cc]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}], failedMessage=GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}], failedMessage=GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2110) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2098) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1997) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1924) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1812) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.5.jar:2.6.5]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_171]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_171]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_171]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}], failedMessage=GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2065) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2047) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1984) ~[spring-kafka-2.6.5.jar:2.6.5]
    ... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.demo.User] for GenericMessage [payload={"name":"Prateek","age":33}, headers={kafka_offset=7, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32375775, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=users, kafka_receivedTimestamp=1613708323636, __TypeId__=[B@165438c, kafka_groupId=group_id}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.3.jar:5.3.3]
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:926) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.3.3.jar:5.3.3]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.3.3.jar:5.3.3]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.3.3.jar:5.3.3]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329) ~[spring-kafka-2.6.5.jar:2.6.5]
    ... 13 common frames omitted

我们如何解决这个问题?

下面是我的代码-

用户.java

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private String name;
    private int age;
}

KafkaProducerConfig.java

@Configuration
public class KafkaProducerConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    // 1. Send string to Kafka
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // 2. Send User objects to Kafka
    @Bean
    public ProducerFactory<String, User> userProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, User> userKafkaTemplate() {
        return new KafkaTemplate<>(userProducerFactory());
    }
}

Producer.java

@Service
public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendMessage(User user) {
        logger.info(String.format("#### -> Producing message -> %s", user.toString()));
        this.kafkaTemplate.send(TOPIC, user);
    }
}

KafkaConsumerConfig.java

@Configuration
public class KafkaConsumerConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${general.topic.group.id}")
    private String groupId;

    @Value(value = "${user.topic.group.id}")
    private String userGroupId;

    // 1. Consume string data from Kafka
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    // 2. Consume user objects from Kafka
    public ConsumerFactory<String, User> userConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, userGroupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User.class));
    }
    

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userConsumerFactory());
        factory.setMessageConverter(new StringJsonMessageConverter());
        return factory;
    }
}

Consumer.java

@Service
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Producer.class);

    @KafkaListener(topics = "users", groupId = "group_id")
    public void consume(User user) throws IOException {
        logger.info(String.format("#### -> Consumed message -> %s", user.toString()));
    }
}

KafkaController.java

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @PostMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestBody User user) {
        this.producer.sendMessage(user);
    }
}

KafkaExampleApplication.java

@SpringBootApplication
public class KafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaExampleApplication.class, args);
    }
}

application.properties

server.port=9000

kafka.bootstrapAddress=localhost:9092
general.topic.group.id=group_id
user.topic.group.id=MyGrpId

【问题讨论】:

    标签: spring spring-boot apache-kafka


    【解决方案1】:

    在您的 Producer 类中,您需要将 kafkaTemplate 重命名为 userKafkaTemplate,它通过您要使用的 JSON 序列化程序连接到 User producer factory。

    事实上,如果你不使用其他模板和生产者工厂,你可能应该删除它们

    【讨论】:

    • 这并不能解决问题,而且变量名可以是任何东西。泛型类型足以识别 Spring boot 连接哪个 bean。
    • @Sayan 是的。它是按名称自动接线的。 “Spring 使用 bean 的名称作为默认限定符值”baeldung.com/spring-autowire 如果您有其他解决方案,请随时提供您自己的答案
    【解决方案2】:
    1. 在 bean userConsumerFactory()KafkaConsumerConfig.java 配置中,您通过属性用户的 @Value 接线使用 userGroupId="MyGrpId" .topic.group.id=MyGrpId。但随后在 Consumer.java 的 @KafkaListener 注解处,指定了 groupId = "group_id"。

    2. 在同一个注解中,最好指定containerFactory而不是groupId,比如@KafkaListener(topics = "users", containerFactory = "userKafkaListenerContainerFactory")。这样,注释将通过 ConcurrentKafkaListenerContainerFactory 准确连接所需的 ConsumerFactory bean 配置。

    3. 另外,我同意@OneCricketeer 的观点,在 Producer 中,您应该将自动装配的 kafkaTemplate 限定为生成 User 对象的那个,您稍后想要使用可反序列化为 User 对象的 JSON,如下所示:

      @Autowired

      @Qualifier("userKafkaTemplate")

      私有 KafkaTemplate kafkaTemplate;

    更多详情:Baeldung - Apache Kafka with Spring

    【讨论】:

      【解决方案3】:

      尝试将其添加到您的 @KafkaListener 注释中,就像在 @Lotzy 答案的第二个子句中一样:

      containerFactory = userKafkaListenerContainerFactory
      

      【讨论】:

        猜你喜欢
        • 2017-06-19
        • 2017-01-31
        • 1970-01-01
        • 1970-01-01
        • 2014-12-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多