【问题标题】:Kafka Consumer + Schema Registry dynamic subject nameKafka Consumer + Schema Registry 动态主题名
【发布时间】:2022-01-10 22:46:03
【问题描述】:

我有多个主题 A、B 和 C,我正在使用 Kafka Streams 将它们扩展到主题 X。 主题 A、B 和 C 使用默认主题名称策略在模式注册表中注册为主题。流式传输非常愚蠢,它只是在不确保它们符合注册表中的架构的情况下将消息扇入其中,但它会在消息中添加 ORIGINAL_TOPIC_NAME 标头以表明它来自主题 A、B 或 C。

然后我让 Kafka Consumer 从主题 X 消费。该主题未在模式注册表中注册。 Kafka Consumer 是我使用 KafkaAvroDeserialiser 和 schema.registry.url 使用注册表消费的地方。我计划让消费者在这里对注册表进行检查,但使用 ORIGINAL_TOPIC_NAME 标头作为主题。但是我不确定我是否可以控制消费者以使其使用 kafka 标头来解析主题名称,因为主题名称策略是您在初始化时提供给消费者的东西。

有什么想法吗?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    策略被传递给反序列化器

    反序列化函数只能访问键或值的传入字节数组,以及主题名称,而不是标题。

    有一个可以设置的头部反序列化器,但它没有在键/值反序列化器的调用路径中使用。


    解决方法是使用 ByteArrayDeserializer 作为键/值,可选择手动提取标题,使用类似这样的逻辑获取架构 ID

    ByteBuffer bb = ByteBuffer.wrap(value); // or key
    if (bb.get() != 0x0) {
        // not a valid encoded Schema Registry record 
    }
    int schemaId = bb.getInt();
    // Completely ignores the subject 
    Schema schema = schemaRegistryClient.getId(schemaId);
    // TODO: continue to deserialize the key/value byte array 
    

    【讨论】:

      猜你喜欢
      • 2021-03-08
      • 1970-01-01
      • 2022-12-24
      • 2021-07-26
      • 2019-07-21
      • 2019-12-07
      • 2020-09-12
      • 2021-05-20
      • 2020-09-07
      相关资源
      最近更新 更多