【问题标题】:Kafka Connect ignoring the Subject Strategies specifiedKafka Connect 忽略指定的主题策略
【发布时间】:2019-09-15 10:50:55
【问题描述】:

我想使用以下连接器配置将多个表数据发布到同一个 Kafka 主题,但我看到以下异常

例外

原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:正在注册的模式与早期的模式不兼容;错误代码:409

连接器似乎忽略了主题策略属性集并继续使用旧的 ${topic}-key 和 ${topic}-value 主题。

[2019-04-25 22:43:45,590] INFO AvroConverterConfig values: 
    schema.registry.url = [http://schema-registry:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

连接器配置

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
      "name": "two-in-one-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "xxxxxxx",
        "database.port": "3306",
        "database.user": "xxxxxxx",
        "database.password": "xxxxxxxxx",
        "database.server.id": "18405457",
        "database.server.name": "xxxxxxxxxx",
        "table.whitelist": "customers,phone_book",
        "database.history.kafka.bootstrap.servers": "broker:9092",
        "database.history.kafka.topic": "dbhistory.customer",
        "transforms": "dropPrefix",
        "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropPrefix.regex":"(.*)",
        "transforms.dropPrefix.replacement":"customer",
        "key.converter.key.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
        "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
      }
    }'

【问题讨论】:

标签: apache-kafka avro apache-kafka-connect confluent-schema-registry


【解决方案1】:

尝试将策略类设置为连接器配置 (JSON) 文件中的以下参数,而不是“key.converter.key.subject.name.strategy”和“value.converter.value .subject.name.strategy"

"key.subject.name.strategy" "value.subject.name.strategy"

【讨论】:

  • 你测试过这些吗? AvroConverterConfig 首先采用key.convertervalue.converter 的前缀...很像value.converter.schema.registry.url,应该是value.converter.value.subject.name.strategy
  • @cricket_007 我已经在我的连接器配置中使用“value.converter.value.subject.name.strategy”。还是不行。
  • @Suraj 正如我在我链接到的另一篇文章中回答的那样,我认为这些属性没有在 Connect API 中进行过测试,并且在添加它们的 PR 之外的任何地方都没有明确记录它们
猜你喜欢
  • 2019-04-30
  • 1970-01-01
  • 1970-01-01
  • 2018-10-26
  • 2021-03-26
  • 2011-03-31
  • 2018-07-08
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多