【问题标题】:Kafka-confluent: How to use pk.mode=record_key for upsert and delete mode in JDBC sink connector?Kafka-confluent:如何在 JDBC 接收器连接器中使用 pk.mode=record_key 进行 upsert 和 delete 模式?
【发布时间】:2020-07-19 14:57:44
【问题描述】:

在 Kafka confluent 中,我们如何在使用 pk.mode=record_key 作为 MySQL 表中的复合键的同时将源用作 CSV 文件来使用 upsert? upsert 模式在使用pk.mode=record_values 时工作。是否需要进行其他配置?

如果我尝试使用pk.mode=record_key,我会收到此错误。错误 - 原因:org.apache.kafka.connect.errors.ConnectException:需要准确定义一个 PK 列,因为记录的键模式是原始类型。 以下是我的 JDBC 接收器连接器配置:

    {
    "name": "<name>",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "<topic name>",
    "connection.url": "<url>",
    "connection.user": "<user name>",
    "connection.password": "*******",
    "insert.mode": "upsert",
    "batch.size": "50000",
    "table.name.format": "<table name>",
    "pk.mode": "record_key",
    "pk.fields": "field1,field2",
    "auto.create": "true",
    "auto.evolve": "true",
    "max.retries": "10",
    "retry.backoff.ms": "3000",
    "mode": "bulk",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }
}

【问题讨论】:

    标签: apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    您需要使用pk.moderecord.value。 这意味着从消息的中获取字段并将它们用作目标表中的主键并用于UPSERT 目的。

    如果您设置record.key,它将尝试从 Kafka 消息 key 中获取关键字段。除非您确实在消息键中获得了值,否则这不是您要使用的设置。

    这些可能会进一步帮助您:

    【讨论】:

    • 感谢您的回答。我可以更新。我尝试使用pk.fields 中的字段列表,因为我的 MySQL 表有一个复合键并且它有效。但是如果我也在连接器中启用删除模式,那么我必须将pk.mode 设置为record_key。在这里我无法设置复合键。我在创建流时尝试使用partition by 来做到这一点。它在表中添加了一个新行,并且没有使用表的各个字段作为主键。我会参考链接再试一次。
    • 您的评论描述的问题与您在原始问题中提出的问题不同:) 您能否编辑您的原始问题并明确说明您遇到的问题是什么?
    • 我已经发布了关于 cmets 中提到的问题的另一个问题。 (stackoverflow.com/questions/61087098/…)
    • delete_enabled=truepk.mode=record_value 出现错误:org.apache.kafka.common.config.ConfigException: Primary key mode must be 'record_key' when delete support is enabled
    • @felicienb 查看rmoff.net/2021/03/12/…
    猜你喜欢
    • 1970-01-01
    • 2018-09-14
    • 2021-08-30
    • 2019-06-17
    • 2020-01-11
    • 2021-05-07
    • 2018-12-19
    • 2019-01-24
    • 2020-02-05
    相关资源
    最近更新 更多