【发布时间】:2021-12-22 12:51:59
【问题描述】:
我正在使用部署在 Kafka Connect 中的 Debezium MySQL 连接器来将 MySQL 更改流式传输到 Kafka 主题,并从中获取这些消息丰富数据并将数据推送到另一个 MySQL。
source 和 sink 都是 MySQL。
我的源表中有几列,列数据类型为 TIMESTAMP。
例如:create_time : 2021-10-06 09:32:46
我可以在 Kafka Message 中看到上述数据,如下所示
"create_time":"2021-10-06T09:32:46Z"
当消息加载到目标数据库(MySQL)时,上述字符串中的T和Z面临问题,其中数据类型也是TIMESTAMP。
我需要 kafka 消息中的 create_time 作为 "create_time":"2021-10-06 09:32:46" 没有 TZ
错误日志:
- 行导入失败并出现错误:(“日期时间值不正确:'2021-10-06T09:32:46Z' for column 'CREATE_TIME' at row 1”,1292)
我了解 Debezium 正在将 Timestamp 列数据转换为 ZonedDateTime,这可能会添加 TimeZone 信息。 (https://debezium.io/documentation/reference/1.7/connectors/mysql.html#mysql-temporal-types)
我在 Debezium 连接器配置中尝试了 org.apache.kafka.connect.transforms.TimestampConverter 但这没有帮助。
目前我在 Kafka Connect 配置中使用 org.apache.kafka.connect.json.JsonConverter。
注意事项:我正在批量处理从 Kafka 收到的多条消息,构建一个 csv 文件并使用 LOAD DATA 我将 csv 内容推送到目标数据库。
【问题讨论】:
标签: java mysql timestamp apache-kafka-connect debezium