【问题标题】:Kafka JDBC Sink handle array datatypeKafka JDBC Sink 句柄数组数据类型
【发布时间】:2021-05-27 09:56:24
【问题描述】:

我知道 Kafka JDBC Sink 连接器对于数组数据类型有一些缺点。但是,是否可以将 Sink 连接器与支持数组数据类型的简单 Kafka 连接器结合使用。如何从 Kafka 配置中过滤并将它们切换为简单的 Kafka 连接器配置 简单的 Kafka 配置是什么意思? Kafka Connect 如何支持数组字段

name: topic_name
type: array
item: Topic file

这是否可能以字符串而不是数组的形式消耗到数据库中

"fields":[{
  "name":"item_id",
  "type":{
     "type":"array",
     "items":["null", "string"]
  },
"default":[]
}]
}

【问题讨论】:

    标签: jdbc apache-kafka avro apache-kafka-connect


    【解决方案1】:

    Kafka Connect 框架本身并没有暴露类型的限制,它在 JDBC 接收器的源代码中拒绝了数组。

    有一个出色的 PR 支持 Postgres - https://github.com/confluentinc/kafka-connect-jdbc/pull/805

    不清楚您所说的“简单”是什么意思,但如果您想使用不同的连接器,则需要安装它,然后更改类。例如,也许 MongoDB 接收器处理数组。我知道 S3 和 HDFS 接收器确实...

    是否可以将 Sink 连接器与简单的 Kafka 连接器结合使用

    同样,不确定您的意思是什么,但连接器通常不会“链接在一起”。虽然您可以使用带有转换的 MirrorMaker2 来有效地完成与 Kafka Streams 相同的工作,但最好使用更合适的工具来实现这一点

    这可能会以字符串而不是数组的形式消耗到数据库

    当然,如果消息字段实际上是一个字符串。正如建议的那样,您需要在接收器连接器使用它之前处理消息

    【讨论】:

    • 所以我尝试使用 Debezium sink 连接器,因为我读到它支持它,但是当我部署它时,我收到一个 500 错误,需要一些连接器类来支持 Debezium 并且 JDBC Sink 是唯一的选择。是否有更多配置,例如在配置的 Dockerfile 或 yaml 文件/worker 属性中使其工作stackoverflow.com/questions/66893323/… 是否可以提取数组字段并让其他字段读入数据库
    • Debezium 是一个源,而不是一个接收器,并且您的错误与连接无关,而是您命名的 docker 映像
    • 我明白了,我将 docker 镜像命名为 img.dev/kafka/kafka-sink-connect:{{TAG}},所以我需要对 Docker 和 worker 文件进行更多配置来支持它。我尝试添加一个 HOIST,然后将 SMT 展平为 Kafka 文档docs.confluent.io/platform/current/connect/transforms/…,因为它说使用 Struct 包装数据并破坏字段,我让它运行但 JDBC Sink 没有数据进入。关于为什么的任何建议?当我尝试 HoiseField$Value 时,我看到 INFO Setting offset for partition to topic
    • 如果我将字段从数组类型更改为结构,并且我使用 HOIST 并展平 SMT,它将在 db 中读取
    • 根据文档,Flatten 只适用于 Structs 和 maps,它不会遍历数组。您需要查看日志以了解未写入数据的原因(或尝试运行哪些查询)
    猜你喜欢
    • 2022-06-28
    • 2020-06-02
    • 1970-01-01
    • 2019-10-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-15
    • 2021-11-20
    相关资源
    最近更新 更多