【发布时间】:2019-09-18 18:48:55
【问题描述】:
我有一个关于 jdbc-sink 的问题。
postgres1 ---> kafka ---> postgres2
生产者工作正常,但消费者有错误:
连接_1 | org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException:java.sql.BatchUpdateException:批处理条目 0 插入“客户”(“id”)值(1)ON CONFLICT(“id”)做更新 SET 已中止:错误:输入 connect_1 末尾的语法错误 |
位置:77 调用getNextException查看批处理中的其他错误。
这是我的 source.json
{
"name": "src-table",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres1_container",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"database.whitelist": "postgres",
"database.server.name": "postgres1",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
这是我的 jdbc-sink.json
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres2_container:5432/postgres?user=postgres&password=postgres",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
debezium/动物园管理员:0.9
debezium/kafka:0.9
debezium/postgres:9.6
debezium/connect:0.9
PostgreSQL JDBC 驱动程序 42.2.5
Kafka Connect JDBC 5.2.1
我尝试降级 jdbc 驱动程序和 confluent kafka connect 但仍然有同样的错误
【问题讨论】:
-
我发了一个和你类似的问题,你能看看吗?stackoverflow.com/questions/68575974/…
标签: postgresql jdbc apache-kafka docker-compose debezium