【问题标题】:Kafka Connect not working with Subject StrategiesKafka Connect 不适用于主题策略
【发布时间】:2019-04-30 11:47:59
【问题描述】:

上下文

我编写了几个小的Kafka Connect 连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与Schema Registry 集成,因此数据使用Avro 进行序列化。

我使用 fast-data-dev Docker image provided by Landoop 将它们部署到本地 Kafka 环境中

基本设置有效并每秒生成一条记录的消息

但是,我想更改subject name strategy。默认生成两个主题:

  • ${topic}-key
  • ${topic}-value

根据我的用例,我需要生成具有不同模式的事件,这些事件最终会出现在同一个主题上。因此,我需要的主题名称是:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

根据the docs,我的需求适合TopicRecordNameStrategy

我尝试了什么

我创建了avroData 对象用于发送值以进行连接:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

然后使用它来创建SourceRecord 响应对象

The documentation 声明为了在 Kafka Connect 中使用模式注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,我会添加它们:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

问题

连接器似乎忽略了这些属性并继续使用旧的${topic}-key${topic}-value 主题。

问题

Kafka Connect 应该支持不同的主题策略。我通过编写自己的AvroConverter 版本并硬编码主题策略是我需要的策略,设法解决了这个问题。但是,这看起来不是一个好方法,并且在尝试使用 Sink Kafka 连接器使用数据时也带来了问题。我复制了主题,所以有一个旧版本的版本 (${topic}-key),它可以工作

为 Kafka Connect 指定主题策略的正确设置是什么?

【问题讨论】:

    标签: apache-kafka avro apache-kafka-connect confluent-schema-registry


    【解决方案1】:

    您缺少 key.convertervalue.converter 前缀,以便将配置传递给转换器。所以而不是:

    key.subject.name.strategy
    value.subject.name.strategy
    

    你想要的:

    key.converter.key.subject.name.strategy
    value.converter.value.subject.name.strategy
    

    来源https://docs.confluent.io/current/connect/managing/configuring.html

    要将配置参数传递给键和值转换器,请在它们前面加上 key.converter.value.converter.,就像在工作配置中定义默认转换器时一样。请注意,这些仅在 key.convertervalue.converter 属性中指定了相应的转换器配置时使用。

    【讨论】:

    • 你是对的。我在文档中没有找到与此相关的任何内容(可能我没有充分挖掘)。您是否有指向某些文档的链接?这将完成答案
    • 编辑问题以包括参考
    • 对于上下文,这直到 Confluent 4.1.3,AFAIK 才起作用。 github.com/confluentinc/schema-registry/pull/801
    猜你喜欢
    • 2019-09-15
    • 2021-03-26
    • 1970-01-01
    • 1970-01-01
    • 2022-07-08
    • 2018-12-03
    • 2019-07-02
    • 1970-01-01
    • 2020-08-18
    相关资源
    最近更新 更多