【发布时间】:2019-04-22 15:21:42
【问题描述】:
我需要向 Apache Kafka 主题发送(发布)Json 有效负载,但我收到以下错误:- "message": "无法将 com.xyz.User 类的值转换为 value.serializer 中指定的 org.apache.kafka.common.serialization.StringSerializer 类"
Spring 还显示类转换异常:- java.lang.ClassCastException:com.xyz.User 无法转换为 java.lang.String
以下是我的模态、kafka 配置和控制器
public class User {
private String firstname;
private String email;
public User() {}
public User(String firstname, String email) {
super();
this.firstname = firstname;
this.email = email;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "UserModel [firstname=" + firstname + ", email=" + email + "]";
}
}
卡夫卡配置
package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.xyz.User;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
控制器
@RestController
@RequestMapping("/kafka")
public class UserController {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, user);
Json Payload 从邮递员发送
{
"firstname" : "xyz",
"email" : "xyz@gmail.com.com"
}
【问题讨论】:
-
生产者是xml配置的吗?你能把那个也发一下吗?
-
@aurelius ,我正在使用 spring-boot ,我没有任何生产者的 xml 配置。
-
那么你需要KafkaTemplate,用于发送消息
-
是的,我正在尝试通过 kafkatemplate 发送 kafkaTemplate.send(TOPIC, user);
-
你也可以发布那个代码吗,我想你没有改变你发送的消息类型,从字符串到用户
标签: java json apache-kafka kafka-producer-api spring-kafka