【问题标题】:PostgreSQL and Kafka Connect integration issuePostgreSQL 和 Kafka Connect 集成问题
【发布时间】:2019-03-12 10:35:17
【问题描述】:

我正在测试 JDBC Sink 连接器以将记录从 Kafka 转储到 PostgreSQL。这是连接器配置:

{
    "name": "jdbc-sink-postgresql-1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "role",
        "connection.url": "jdbc:postgresql://localhost:5432/postgres?user=&password=",
        "auto.create": "false",                                                   
        "insert.mode": "upsert",
        "mode":"incrementing",
        "table.name.format":"role",
        "pk.mode":"record_value",
        "pk.fields":"role_id"
    }
}

当我运行连接器时,出现以下异常:

java.sql.BatchUpdateException: Batch entry 1 INSERT INTO "role" ("role_id","role_name") VALUES (123,'admin') ON CONFLICT ("role_id") DO UPDATE SET "role_name"=EXCLUDED."role_name" was aborted.  
   Call getNextException to see the cause.
   at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2778))

关于我在这里缺少什么的任何指针?如果需要更多信息,请告诉我。

【问题讨论】:

  • 你能确认你的表名和模式是正确的吗?
  • @Giorgos Myrianthous - 感谢您的快速回复。表和主题的名称是角色。架构名称是公共的,数据库名称是 postgres。
  • 你能不能尝试运行 INSERT INTO "role" ("role_id","role_name") VALUES (123,'admin') ON CONFLICT ("role_id") DO UPDATE SET "role_name"=EXCLUDED ."role_name" 看看你的数据库会发生什么?
  • 另外,你运行的是哪个 Kafka 版本?
  • 这是我注意到问题后做的第一件事,它运行良好,没有任何问题。我正在运行融合的 Kafka 4.0.0

标签: postgresql apache-kafka apache-kafka-connect


【解决方案1】:

所以,问题出在桌子上。这就是我最初创建表格的方式:

CREATE TABLE role(
 role_id int PRIMARY KEY,
 role_name VARCHAR (255) UNIQUE NOT NULL
);

topic中的测试数据是这样的:

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic role --property schema.registry.url=http://localhost:8081/  --property value.schema='{"type":"record","name":"myRecord","fields": [{"name": "role_id","type": "int"},{"name": "role_name","type": "string"}]}' --key-serializer org.apache.kafka.common.serialization.StringSerializer --value-serializer io.confluent.kafka.serializers.KafkaAvroSerializer --property print.key=true
{"role_id":122, "role_name":"admin"}
{"role_id":123, "role_name":"admin"}
{"role_id":124, "role_name":"admin"}
{"role_id":125, "role_name":"admin"}
{"role_id":126, "role_name":"admin"}

因此,当我的测试数据一次又一次地为 role_name 字段具有相同的值时,它违反了唯一约束,因此出现了错误。

我做了什么?

我把桌子放下了。

创建了一个没有唯一键约束的新表,上面的数据被推送到PostgreSQL没有问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-05-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-11
    • 2020-12-11
    • 2018-12-14
    • 2016-12-26
    相关资源
    最近更新 更多