【问题标题】:Loading data in plain json format to postgresql using kafka jdbc sink connector使用kafka jdbc sink连接器将纯json格式的数据加载到postgresql
【发布时间】:2022-01-08 17:01:19
【问题描述】:

这是我之前的问题:- importing-json-data-into-postgres-using-kafka-jdbc-sink-connector

当我生成模式和有效负载格式的数据时,我能够加载 json 数据。但是,对我来说,不可能将模式分配给每条记录。所以,我开始寻找其他解决方案并找到Schema Inferencing for JsonConverter。根据文档,我禁用了 value.converter.schemas.enable 并启用了 value.converter.schemas.infer.enable 但我仍然面临同样的错误

即, 原因:org.apache.kafka.connect.errors.ConnectException:接收器连接器“load_test”配置为“delete.enabled=false”和“pk.mode=none”,因此需要非空记录结构值和非空结构架构,但在 (topic='dup_emp',partition=0,offset=0,timestamp=1633066307312) 找到记录,其中包含 HashMap 值和空值架构。

我的配置:-

curl -X PUT http://localhost:8083/connectors/load_test/config \
-H "Content-Type: application/json" \
-d '{
 "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
 "connection.url":"jdbc:postgresql://localhost:5432/somedb",
 "connection.user":"user",
 "connection.password":"passwd",
 "value.converter":"org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable":"false",
 "value.converter.schemas.infer.enable": "true",
 "tasks.max" : "1",
 "topics":"dup_emp",
 "table.name.format":"dup_emp",
 "insert.mode":"insert",
 "quote.sql.identifiers":"never"
}'

我已经通过sink_config_options这里根据我的理解,我需要用key生成记录,其中key包含主键字段的结构,需要设置pk_mode:record_keydelete.enabled:true

如果我理解错误,请纠正我。如果我的理解是正确的,我们如何使用 struct 类型的键(包含所有主键)生成记录,最后,如何使它成功地从主题中填充 postgres 中的数据。

【问题讨论】:

    标签: json postgresql apache-kafka


    【解决方案1】:

    不可能将架构分配给每条记录

    那么就不能使用这个连接器,因为它需要一个模式才能知道存在哪些字段和类型。

    您链接到的 KIP 处于“讨论中”,具有未分配的开放 JIRA,未实施。

    替代方法是不使用 JSON,而是使用结构化二进制格式,例如 Confluent 提供的格式(Avro 或 Protobuf)。你可以use KSQL before consuming in Connect to do this translation(需要运行 Confluent Schema Registry)

    否则,您需要编写自己的 Converter(或 Transform)并将其添加到 Connect 类路径中,以便它返回一个 Struct

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-01-22
      • 2019-06-04
      • 2019-06-15
      • 1970-01-01
      • 2020-07-31
      • 2019-06-22
      • 2021-05-05
      • 1970-01-01
      相关资源
      最近更新 更多