【发布时间】:2020-07-19 14:57:44
【问题描述】:
在 Kafka confluent 中,我们如何在使用 pk.mode=record_key 作为 MySQL 表中的复合键的同时将源用作 CSV 文件来使用 upsert? upsert 模式在使用pk.mode=record_values 时工作。是否需要进行其他配置?
如果我尝试使用pk.mode=record_key,我会收到此错误。错误 - 原因:org.apache.kafka.connect.errors.ConnectException:需要准确定义一个 PK 列,因为记录的键模式是原始类型。
以下是我的 JDBC 接收器连接器配置:
{
"name": "<name>",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "<topic name>",
"connection.url": "<url>",
"connection.user": "<user name>",
"connection.password": "*******",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "<table name>",
"pk.mode": "record_key",
"pk.fields": "field1,field2",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
【问题讨论】:
标签: apache-kafka apache-kafka-connect confluent-platform