【发布时间】:2020-09-25 04:43:21
【问题描述】:
我正在尝试使用 Kafka Connect 将数据从旧的 DB2 数据库同步到使用 JDBC 源和接收器连接器的 Postgres 数据库。它工作正常,但前提是我对用于表名的情况非常严格。
例如,我在 DB2 中有一个名为 ACTION 的表,它也存在于 Postgres 中,具有相同的列等。唯一的区别是在 DB2 中它是大写的 ACTION,而在 Postgres 中它是小写的 action。
这是一个有效的接收器文件:
{
"name": "jdbc_sink_pg_action",
"config": {
"_comment": "The JDBC connector class",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"_comment": "How to serialise the value of keys ",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"_comment": " --- JDBC-specific configuration below here --- ",
"_comment": "JDBC connection URL.",
"connection.url": "jdbc:postgresql://localhost:5435/postgres",
"connection.user": "postgres",
"connection.password": "*****",
"topics": "ACTION",
"table.name.format": "action",
"_comment": "The insertion mode to use",
"insert.mode": "upsert",
"_comment": "The primary key mode",
"pk.mode": "record_value",
"_comment": "List of comma-separated primary key field names. The runtime interpretation of this config depends on the pk.mode",
"pk.fields": "ACTION_ID",
"quote.sql.identifiers": "never"
}
}
这没问题,但它不是很灵活。例如,我有许多其他表,我也想同步它们,但我不想为每个表创建一个连接器文件。所以我尝试使用:
"table.name.format": "${topic}",
当我这样做时,当我尝试加载我的接收器连接器时,我在日志中收到以下错误:
引起:org.apache.kafka.connect.errors.ConnectException:表“ACTION” 丢失并且自动创建被禁用
所以在我看来"quote.sql.identifiers": "never" 实际上并没有工作,否则接收器连接器正在执行的查询将不被引用,并且它允许任何情况(它会转换为更低)。
为什么这不起作用?如果我只使用 ACTION 作为 table.name.format,我会得到相同的结果。
【问题讨论】:
标签: postgresql apache-kafka apache-kafka-connect