【问题标题】:Debezium Kafka CDC connector making key as avro even when converter is StringConverver即使转换器是 StringConverver,Debezium Kafka CDC 连接器也将密钥设为 avro
【发布时间】:2020-08-16 19:21:29
【问题描述】:

这是我的连接器配置:

curl -s -k -X POST http://***************:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "mysql-cdc-CUSTOMER_DETAILS-007",
  "config": {
    "tasks.max":"2",
    "poll.interval.ms":"500",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "dbnode",
    "database.port": "3306",
    "database.user": "**********",
    "database.password": "###########",
    "database.server.name": "dbnode",
    "database.whitelist": "device_details",
    "database.history.kafka.bootstrap.servers": "**********:9092",
    "database.history.kafka.topic": "schema-changes.device_details",
    "include.schema.changes":"true",
    "table.whitelist":"device_details.tb_customermst",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://************:8081",
    "internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.key.converter.schemas.enable":"false",
    "internal.value.converter.schemas.enable":"false"
  }
}' | jq '.'

从ksql消费数据时,显示如下:

ksql> print 'Device_Details.device_details.tb_customermst' from beginning;
Format:AVRO
5/2/20 2:08:34 PM IST, Struct{customerid=10001}, {"before": null, "after": {"customerid": 10001, "firstname": "Klara", "lastname": "Djokic", "emailid": "klara.djokic007@iillii.org", "mobilenumber": "+1 (480) 361-5311", "customertype": "Commercial", "emailverified": 1, "mobileverified": 1, "city": "Gilbert", "postcode": "85296", "address": "3426 E Elgin St", "latitude": 33.29840528, "longitude": -111.71571314, "UPDATE_TS": "2020-05-02T08:38:33Z"}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "Device_Details", "ts_ms": 1588408713000, "snapshot": "false", "db": "device_details", "table": "tb_customermst", "server_id": 1, "gtid": "98557612-65ba-11ea-8dc4-000c29bcb2b4:6", "file": "binlog.000044", "pos": 4417, "row": 0, "thread": 8, "query": null}, "op": "c", "ts_ms": 1588408713761, "transaction": null}

密钥是 Struct{customerid=10001} 我希望密钥为 10001

我怎样才能做到这一点...

当使用 ValueToKeyExtractField$Key SMT 连接日志时出现以下错误:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:315)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
    at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

谁能告诉我该怎么做才能获得 10001 的密钥。

提前致谢。

PS:我正在使用....Confluent Platform 5.4.0....Debezium Connector for MySql 1.1.0

【问题讨论】:

    标签: mysql apache-kafka apache-kafka-connect debezium confluent-platform


    【解决方案1】:

    你快到了。您需要单独使用ExtractField$Key 变换(即没有ValueToKey)将值提升出结构。

    "transforms":"extractKeyfromStruct",
    "transforms.extractKeyfromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKeyfromStruct.field":"customerid",
    

    【讨论】:

    • 不清楚“单独”......你的意思是在连接器配置中我需要使用 { ExtractField$Key } 而不使用 ValueToKey SMT......
    • 仍然收到与之前相同的错误,但这次是 ExtractField Caused by: java.lang.NullPointerException at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    • 如何对只有一个连接的所有表执行此操作?
    猜你喜欢
    • 2018-11-22
    • 1970-01-01
    • 2020-04-20
    • 2019-10-20
    • 2019-09-02
    • 2019-07-22
    • 2019-11-21
    • 2021-06-10
    • 2020-04-27
    相关资源
    最近更新 更多