【发布时间】:2021-09-08 09:11:45
【问题描述】:
我有一个环境,我使用一个 Kafka Connect Worker,它使用来自 Oracle 数据库的一些数据,然后将其推送到 Avro 格式的 Kafka 主题中。
现在,我需要创建一个 Kafka Connect Sink 来使用此 AVRO 消息,将其转换为 Json,然后将其写入 Redis 数据库。
到目前为止,我只能在 Redis 上写入我从主题中使用的相同 AVRO 消息。我曾尝试使用转换器,但我可能误解了它们的用法。
下面是我对工人和水槽的配置。
{
"name": "SOURCE",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "createKey, extractStr",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "ID",
"transforms.extractStr.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractStr.field": "ID",
"connection.url": "<>",
"connection.user": "<>",
"connection.password": "<>",
"table.whitelist": "V_TEST_C",
"schema.pattern": "<>",
"numeric.mapping": "best_fit",
"mode": "timestamp+incrementing",
"incrementing.column.name": "CID",
"timestamp.column.name": "TS_ULT_ALT",
"validate.non.null": "false",
"table.types": "VIEW",
"retention.ms":12000,
"poll.interval.ms": "30000",
"topic.prefix": "TEST.",
"value.converter.schema.registry.url": "<>"
}
}
下沉
{
"name": "SINK",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"topics": "V_TEST_C",
"redis.hosts": "redis:6379",
"schema.registry.url": "<>",
"value.converter.schema.registry.url": "<>",
"value.converter.schemas.enable":"false",
"key.converter.schemas.enable":"false",
"insert.mode": "UPSERT",
"delete.enabled": "false",
"quote.sql.identifier": "never"
}
}
【问题讨论】:
-
您的源连接器使用的是 JSON,而不是 Avro...
-
无论如何,转换器不会像您想象的那样“转换”(在序列化格式之间)。它将这些类型(JSON/Avro)转换为 Connect Framework 内部的
Struct和Schema类,而 Sink 和 Source 类使用它们来实际读取/写入数据
标签: json apache-kafka redis avro apache-kafka-connect