【问题标题】:Tables require a primary key when create a table with kafka topic使用 kafka 主题创建表时,表需要主键
【发布时间】:2020-11-03 10:01:41
【问题描述】:

我有一个 mysql 表,如下所示: 我使用 kafka 连接器将此表添加到 kafka 主题:

ksql> CREATE SOURCE CONNECTOR SOURCE_MYSQL_01 WITH (
    'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
    'database.hostname' = 'mysql',
    'database.port' = '3306',
    'database.user' = 'debezium',
    'database.password' = 'dbz',
    'database.server.id' = '42',
    'database.server.name' = 'asgard',
    'table.whitelist' = 'demo.customers',
    'database.history.kafka.bootstrap.servers' = 'kafka:29092',
    'database.history.kafka.topic' = 'dbhistory.demo' ,
    'include.schema.changes' = 'false',
    'transforms'= 'unwrap,extractkey',
    'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
    'transforms.extractkey.field'= 'id',
    'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'= 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'= 'http://schema-registry:8081'
    );

然后我想用它来创建一个基于它的表:

CREATE TABLE  CUSTOMERS WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

然后我得到这个错误:

Tables require a PRIMARY KEY. Please define the PRIMARY KEY.
Use a partial schema to define the primary key and still load the value columns from the Schema Registry, for example:
        CREATE TABLE CUSTOMERS (ID INT PRIMARY KEY) WITH (...);

当我按照建议进行更改时:

CREATE TABLE CUSTOMERS (id int primary key) WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

我收到此错误:

Duplicate column names: `ID`

进行了一些搜索,但仍然停留在此处。这有什么问题,我该如何解决?谢谢

【问题讨论】:

    标签: apache-kafka ksqldb


    【解决方案1】:

    问题是您分配给 PK 列的名称与从架构注册表加载的 Avro 架构中的列名称冲突。

    您可以随意命名您的键列,因为列名不会保留在任何地方,所以只需将其命名为不冲突的名称,例如customer_id.

    CREATE TABLE CUSTOMERS (customer_id int primary key) WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
    

    还注意到您有:

    'key.converter'='org.apache.kafka.connect.storage.StringConverter'

    我相信这会将密钥转换为STRING。不确定这是故意的。 ksqlDB 也可以与INT PK 一起使用...

    【讨论】:

      猜你喜欢
      • 2020-10-04
      • 2019-06-22
      • 2013-12-13
      • 1970-01-01
      • 1970-01-01
      • 2020-01-20
      • 1970-01-01
      • 1970-01-01
      • 2018-07-27
      相关资源
      最近更新 更多