【问题标题】:Kafka Connect JDBC Sink quote.sql.identifiers not workingKafka Connect JDBC Sink quote.sql.identifiers 不工作
【发布时间】:2020-09-25 04:43:21
【问题描述】:

我正在尝试使用 Kafka Connect 将数据从旧的 DB2 数据库同步到使用 JDBC 源和接收器连接器的 Postgres 数据库。它工作正常,但前提是我对用于表名的情况非常严格。

例如,我在 DB2 中有一个名为 ACTION 的表,它也存在于 Postgres 中,具有相同的列等。唯一的区别是在 DB2 中它是大写的 ACTION,而在 Postgres 中它是小写的 action

这是一个有效的接收器文件:

{
    "name": "jdbc_sink_pg_action",
    "config": {
        "_comment": "The JDBC connector class",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

        "_comment": "How to serialise the value of keys ",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",


        "_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",

        "_comment": " --- JDBC-specific configuration below here  --- ",

        "_comment": "JDBC connection URL.",
        "connection.url": "jdbc:postgresql://localhost:5435/postgres",
        "connection.user": "postgres",
        "connection.password": "*****",

        "topics": "ACTION",
        "table.name.format": "action",

        "_comment": "The insertion mode to use",
        "insert.mode": "upsert",

        "_comment": "The primary key mode",
        "pk.mode": "record_value",

        "_comment": "List of comma-separated primary key field names. The runtime interpretation of this config depends on the pk.mode",
        "pk.fields": "ACTION_ID",

        "quote.sql.identifiers": "never"
    }
}

这没问题,但它不是很灵活。例如,我有许多其他表,我也想同步它们,但我不想为每个表创建一个连接器文件。所以我尝试使用:

"table.name.format": "${topic}",

当我这样做时,当我尝试加载我的接收器连接器时,我在日志中收到以下错误:

引起:org.apache.kafka.connect.errors.ConnectException:表“ACTION” 丢失并且自动创建被禁用

所以在我看来"quote.sql.identifiers": "never" 实际上并没有工作,否则接收器连接器正在执行的查询将不被引用,并且它允许任何情况(它会转换为更低)。

为什么这不起作用?如果我只使用 ACTION 作为 table.name.format,我会得到相同的结果。

【问题讨论】:

    标签: postgresql apache-kafka apache-kafka-connect


    【解决方案1】:

    您的 PostgreSQL 表名称 (action) 不等于主题名称 (ACTION)。 Kafka Connect JDBC 连接器usesgetTables() 方法来检查表是否存在,其中tableNamePattern 参数区分大小写(根据文档:must match the table name as it is stored in the database)。

    您可以使用从Kafka Connect Common Transformations 转换而来的ChangeTopicCase

    【讨论】:

    • 感谢您的回复。我会试试的!所以我可以在我的接收器文件中使用它,它会在 kafka connect 读取 table.name.format 中的“$ {topic}”之前获取主题并更改它?我也可以使用 RegexRouter 删除前缀,我认为这些可以在接收器上正常工作? docs.confluent.io/current/connect/transforms/regexrouter.html
    • 是的,SMT 在将出站消息发送到接收器连接器之前对其进行转换。而且你可以一起使用 RegexRouter SMT,因为 Kafka Connect 支持链式转换docs.confluent.io/current/connect/transforms/…
    • 酷,有道理。最后一个问题,当您说“在将它们发送到接收器连接器之前”时,您不是说此转换将在源文件中吗?应该在水槽里吧?再次感谢!
    • 不客气!这是关于水槽连接器的。因此,您可以为您的案例想象 Sink 连接器的消息路径,如下所示:Kafka -> SMT Chain -> JDBC -> Postgres 此信息来自 Confluent 文档 (docs.confluent.io/current/connect/transforms/…): > SMT 在源连接器生成入站消息之后转换入站消息,但在它们被写入之前卡夫卡。 SMT 在将出站消息发送到接收器连接器之前对其进行转换。
    • 另外请阅读这篇关于SMT的非常有用的文章confluent.io/blog/…
    猜你喜欢
    • 1970-01-01
    • 2020-09-23
    • 2020-07-31
    • 2018-07-08
    • 2019-05-12
    • 2020-12-14
    • 2021-12-19
    • 1970-01-01
    • 2019-10-20
    相关资源
    最近更新 更多