【问题标题】:Kafka Consumer + Schema Registry dynamic subject nameKafka Consumer + Schema Registry 动态主题名
【发布时间】:2022-01-10 22:46:03
【问题描述】:
我有多个主题 A、B 和 C,我正在使用 Kafka Streams 将它们扩展到主题 X。
主题 A、B 和 C 使用默认主题名称策略在模式注册表中注册为主题。流式传输非常愚蠢,它只是在不确保它们符合注册表中的架构的情况下将消息扇入其中,但它会在消息中添加 ORIGINAL_TOPIC_NAME 标头以表明它来自主题 A、B 或 C。
然后我让 Kafka Consumer 从主题 X 消费。该主题未在模式注册表中注册。 Kafka Consumer 是我使用 KafkaAvroDeserialiser 和 schema.registry.url 使用注册表消费的地方。我计划让消费者在这里对注册表进行检查,但使用 ORIGINAL_TOPIC_NAME 标头作为主题。但是我不确定我是否可以控制消费者以使其使用 kafka 标头来解析主题名称,因为主题名称策略是您在初始化时提供给消费者的东西。
有什么想法吗?
【问题讨论】:
标签:
apache-kafka
kafka-consumer-api
apache-kafka-streams
【解决方案1】:
策略被传递给反序列化器
反序列化函数只能访问键或值的传入字节数组,以及主题名称,而不是标题。
有一个可以设置的头部反序列化器,但它没有在键/值反序列化器的调用路径中使用。
解决方法是使用 ByteArrayDeserializer 作为键/值,可选择手动提取标题,使用类似这样的逻辑获取架构 ID
ByteBuffer bb = ByteBuffer.wrap(value); // or key
if (bb.get() != 0x0) {
// not a valid encoded Schema Registry record
}
int schemaId = bb.getInt();
// Completely ignores the subject
Schema schema = schemaRegistryClient.getId(schemaId);
// TODO: continue to deserialize the key/value byte array