【发布时间】:2019-07-25 09:29:56
【问题描述】:
我想使用 spring-Kafka 库的 spring boot 配置的消费者来使用来自我的 Kafka 代理的消息,源是一个 JDBC 连接器,它负责从 MySQL 数据库中提取消息并且需要使用这些消息
下面我附上我的 application.yml 文件
server:
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
消费者 -
@KafkaListener(topics = "Sample-Topic", groupId = "group_id")
public void consume(String message) throws IOException {
System.out.println(message);
}
主题中的消息以 Avro 格式存储,因为我使用的是 JDBC 连接器。
我得到的是加密输出,而不是 JSON,一些输出消息只是纯字符串。
【问题讨论】:
标签: java spring-boot apache-kafka java-8 spring-kafka