【问题标题】:MySQL Sink Connector - JsonConverter - DataException: Unknown schema type: nullMySQL 接收器连接器 - JsonConverter - DataException:未知模式类型:null
【发布时间】:2021-10-03 20:41:24
【问题描述】:

我已创建 MySQL sink 连接器并成功运行,我看到日志并得到 200 响应,但 sink 连接器无法将数据推送到 mysql db(数据在主题中可用)

{ 
"name":"mysql-sink-connector", 
"config":{ "tasks.max":"2", 
"batch.size":"1000", 
"batch.max.rows":"1000", 
"poll.interval.ms":"500", 
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", 
"connection.url":"jdbc:mysql://mysql.azure.com:3306/db_test_dev", 
"table.name.format":"tbl_clients_merchants",
"topics":"createorder", 
"connection.user":"user", 
"connection.password":"password", 
"auto.create":"true", 
"auto.evolve":"true", 
"value.converter":"org.apache.kafka.connect.json.JsonConverter", 
"value.converter.schemas.enable":"true", 
"key.converter":"org.apache.kafka.connect.json.JsonConverter", 
"key.converter.schemas.enable":"true" 
}}

遇到错误

[2021-07-29 09:41:03,157] ERROR WorkerSinkTask{id=jdbc-mysql-sink-connector-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler 
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: null 
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743 
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 13 more 
[2021-07-29 13:26:11,347] ERROR WorkerSinkTask{id=mysql-sink-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178) 

【问题讨论】:

  • 该输出不包含错误。您需要显示您的数据库日志以证明正在发出请求。请edit你的问题而不是使用cmets
  • @OneCricketeer 我更新了下面提到的 URL 中的所有代码和错误日志,请您帮忙解决这个问题。网址:issues.apache.org/jira/browse/KAFKA-13163
  • 您无法打开 JIRAs for MySQL 连接器,因为那是 Confluent 产品,而不是 Apache。话虽这么说,您的配置显然使用了FileStreamSinkConnector,它不接受connection.url 或任何JDBC 属性,所以我很困惑您的问题是什么,我在这里问您edit 您的问题,而不是创建一个JIRA

标签: mysql jdbc apache-kafka apache-kafka-connect


【解决方案1】:

正如错误提示,您在主题中有空墓碑记录,由于没有字段名称,因此无法映射到任何 MySQL 列。

选项 1) 修复您的生产者不发送这些消息

选项 2)Filter them in Connect

例如,

"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate" : "DropNull",

"predicates" : "DropNull",
"predicates.DropNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-06-02
    • 1970-01-01
    • 1970-01-01
    • 2014-08-17
    • 1970-01-01
    • 1970-01-01
    • 2020-03-28
    • 2023-04-05
    相关资源
    最近更新 更多