【发布时间】:2019-02-18 13:16:10
【问题描述】:
我正在尝试使用 MySQL 表中的记录,该表包含 3 列 (Axis, Price, lastname),其数据类型分别为 (int, decimal(14,4), varchar(50))。
我插入了一条记录,其中包含以下数据(1, 5.0000, John)。
以下 Java 代码(使用 Confluent 平台中 MySQL 连接器创建的主题中的 AVRO 记录)读取十进制列:价格,作为 java.nio.HeapByteBuffer 类型,因此我无法达到该列的值当我收到它时。
有没有办法将接收到的数据提取或转换为 Java 十进制或双精度数据类型?
这是 MySQL 连接器属性文件:-
{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"incrementing.column.name": "Axis",
"tasks.max": "1",
"table.whitelist": "ticket",
"mode": "incrementing",
"topic.prefix": "mysql-",
"name": "mysql-source",
"validate.non.null": "false",
"connection.url": "jdbc:mysql://localhost:3306/ticket?
user=user&password=password"
}
}
这里是代码:-
public static void main(String[] args) throws InterruptedException,
IOException {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "sql-ticket";
final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("value = %s \n", record.value().get("Price"));
}
}
} finally {
consumer.close();
}
}
【问题讨论】:
-
您的消费者代码正在使用旧的、已弃用的消费者 API...您从哪里获得该代码?
-
@cricket_007 我正在使用 confluent 4.0.0 和此版本中的 java 示例docs.confluent.io/4.0.0/schema-registry/docs/… 如果我使用最新版本,它会解决问题吗?
-
嗯,文档的那部分已经过时了...我可以提出问题并进行更新,但请看这个。具体来说,客户端与 Kafka 交互不再需要 Zookeeper confluent.io/blog/…
-
要不然试试
avroRecord.get("Price").getFloat()) -
1) 我没有使用 JDBC 源连接器,所以很遗憾我不知道适合您的解决方案。必须有某种
HeapByteBuffer方法将字节转换为您想要的类型。 2) 我通常使用通过 Avro Maven 插件构建的 SpecificRecord avro 类型,而不是普通的 GenericRecords。
标签: java mysql apache-kafka apache-kafka-connect confluent-platform