【问题标题】:Send a json payload to Apache Kafka topic in Spring在 Spring 中向 Apache Kafka 主题发送 json 有效负载
【发布时间】:2019-04-22 15:21:42
【问题描述】:

我需要向 Apache Kafka 主题发送(发布)Json 有效负载,但我收到以下错误:- "message": "无法将 com.xyz.User 类的值转换为 value.serializer 中指定的 org.apache.kafka.common.serialization.StringSerializer 类"

Spring 还显示类转换异常:- java.lang.ClassCastException:com.xyz.User 无法转换为 java.lang.String

以下是我的模态、kafka 配置和控制器

public class User {
    private String firstname;
    private String email;


    public User() {}
    public User(String firstname, String email) {
        super();
        this.firstname = firstname;
        this.email = email;
    }
    public String getFirstname() {
        return firstname;
    }
    public void setFirstname(String firstname) {
        this.firstname = firstname;
    }
    public String getEmail() {
        return email;
    }
    public void setEmail(String email) {
        this.email = email;
    }
    @Override
    public String toString() {
        return "UserModel [firstname=" + firstname + ", email=" + email + "]";
    }


}

卡夫卡配置

 package config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import com.xyz.User;

@Configuration
public class KafkaConfiguration {

    @Bean
    public ProducerFactory<String, User> producerFactory(){
        Map<String, Object> config = new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);

    }
     @Bean
        public KafkaTemplate<String, User> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }

}

控制器

@RestController
@RequestMapping("/kafka")
public class UserController {
    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    private static String TOPIC = "kafka-producer";
    @PostMapping("/publish")
    public void getUserId(@RequestBody User user) {

        kafkaTemplate.send(TOPIC, user);

Json Payload 从邮递员发送

{
    "firstname" : "xyz",
    "email" : "xyz@gmail.com.com"

}

【问题讨论】:

  • 生产者是xml配置的吗?你能把那个也发一下吗?
  • @aurelius ,我正在使用 spring-boot ,我没有任何生产者的 xml 配置。
  • 那么你需要KafkaTemplate,用于发送消息
  • 是的,我正在尝试通过 kafkatemplate 发送 kafkaTemplate.send(TOPIC, user);
  • 你也可以发布那个代码吗,我想你没有改变你发送的消息类型,从字符串到用户

标签: java json apache-kafka kafka-producer-api spring-kafka


【解决方案1】:

以下对我有用。

public class UserController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
    ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
    String json = null;
    try {
        json = ow.writeValueAsString(user);
    } catch (JsonProcessingException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }   
kafkaTemplate.send(TOPIC, json);


}

【讨论】:

  • 出于性能原因,将 ObjectWriter 声明移到方法之外,并且不要漂亮地打印消息
  • 不过,Spring 应该已经知道如何使用 Jackson 来转换你的对象 codenotfound.com/…
  • 是的。你不需要手动序列化。 JsonSerializer 在内部使用 ObjectMapper,因此它也可以与 User 对象一起使用。请参阅我的答案以更正您的配置和更多详细信息
【解决方案2】:

能够在我的本地设置中重现您的错误。

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String

如果我们查看日志,那么它会显示 value.serializer 值仍然指的是默认值 StringSerializer 而不是预期的 JsonSerializer 反过来说您的生产者配置没有生效。简而言之,您的 KafkaCOnfiguration 课程没有被推荐。

您的 KafkaConfiguration 类位于某个 config 包中,User 类位于某个 com.xyz 包中。因此,解决方案是确保它能够获取您的配置。很可能该包可能不会被扫描以获取配置/bean 定义。如果您将 KafkaConfiguration 移动到应用程序的根包,那么您的原始代码应该可以正常工作。

如果您说您的 KafkaTemplate 对象被注入,那么实际上并非如此。注入的那个是由 Spring kafka Autoconfiguration 定义的。

【讨论】:

    【解决方案3】:

    服务

    @Service
    public class KafkaSenderService {
    
      @Autowired
      private KafkaTemplate<String, JsonNode> kafkaTemplate;
    
      public void send(String kafkaTopic, JsonNode message) {
        kafkaTemplate.send(kafkaTopic, message);
      }
    }
    

    控制器

    for (JsonNode data: datas) {
       kafkaSender.send(topicName,data);
    }
    

    application.properties

    spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    

    【讨论】:

      猜你喜欢
      • 2018-04-05
      • 1970-01-01
      • 2019-06-17
      • 2012-07-24
      • 2022-12-09
      • 1970-01-01
      • 2021-10-29
      • 2020-03-30
      • 1970-01-01
      相关资源
      最近更新 更多