【发布时间】:2019-04-30 11:47:59
【问题描述】:
上下文
我编写了几个小的Kafka Connect 连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与Schema Registry 集成,因此数据使用Avro 进行序列化。
我使用 fast-data-dev Docker image provided by Landoop 将它们部署到本地 Kafka 环境中
基本设置有效并每秒生成一条记录的消息
但是,我想更改subject name strategy。默认生成两个主题:
${topic}-key${topic}-value
根据我的用例,我需要生成具有不同模式的事件,这些事件最终会出现在同一个主题上。因此,我需要的主题名称是:
${topic}-${keyRecordName}${topic}-${valueRecordName}
根据the docs,我的需求适合TopicRecordNameStrategy
我尝试了什么
我创建了avroData 对象用于发送值以进行连接:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
然后使用它来创建SourceRecord 响应对象
The documentation 声明为了在 Kafka Connect 中使用模式注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,我会添加它们:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
问题
连接器似乎忽略了这些属性并继续使用旧的${topic}-key 和${topic}-value 主题。
问题
Kafka Connect 应该支持不同的主题策略。我通过编写自己的AvroConverter 版本并硬编码主题策略是我需要的策略,设法解决了这个问题。但是,这看起来不是一个好方法,并且在尝试使用 Sink Kafka 连接器使用数据时也带来了问题。我复制了主题,所以有一个旧版本的版本 (${topic}-key),它可以工作
为 Kafka Connect 指定主题策略的正确设置是什么?
【问题讨论】:
标签: apache-kafka avro apache-kafka-connect confluent-schema-registry