【发布时间】:2022-01-08 17:01:19
【问题描述】:
这是我之前的问题:- importing-json-data-into-postgres-using-kafka-jdbc-sink-connector
当我生成模式和有效负载格式的数据时,我能够加载 json 数据。但是,对我来说,不可能将模式分配给每条记录。所以,我开始寻找其他解决方案并找到Schema Inferencing for JsonConverter。根据文档,我禁用了 value.converter.schemas.enable 并启用了 value.converter.schemas.infer.enable 但我仍然面临同样的错误
即, 原因:org.apache.kafka.connect.errors.ConnectException:接收器连接器“load_test”配置为“delete.enabled=false”和“pk.mode=none”,因此需要非空记录结构值和非空结构架构,但在 (topic='dup_emp',partition=0,offset=0,timestamp=1633066307312) 找到记录,其中包含 HashMap 值和空值架构。
我的配置:-
curl -X PUT http://localhost:8083/connectors/load_test/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://localhost:5432/somedb",
"connection.user":"user",
"connection.password":"passwd",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"value.converter.schemas.infer.enable": "true",
"tasks.max" : "1",
"topics":"dup_emp",
"table.name.format":"dup_emp",
"insert.mode":"insert",
"quote.sql.identifiers":"never"
}'
我已经通过sink_config_options这里根据我的理解,我需要用key生成记录,其中key包含主键字段的结构,需要设置pk_mode:record_key和delete.enabled:true
如果我理解错误,请纠正我。如果我的理解是正确的,我们如何使用 struct 类型的键(包含所有主键)生成记录,最后,如何使它成功地从主题中填充 postgres 中的数据。
【问题讨论】:
标签: json postgresql apache-kafka