【问题标题】:Kafka Stream Ksql Json卡夫卡流Ksql Json
【发布时间】:2019-11-13 13:34:51
【问题描述】:

kafka 流/Ksql 是否真的以某种方式原生支持 json?还支持哪些其他格式?我已经看到有可能将平面 json 解释为表格。我想更好地理解那部分; kafka-streams 通过 Ksql 可以通过 SQL 查询的其他格式是什么?这怎么可能或支持?什么是原生支持?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams ksqldb


    【解决方案1】:

    KSQL

    对于值格式,KSQL 支持 AVRO、JSON 和 DELIMITED(如 CSV)。

    您可以在此处找到文档:

    Kafka 流

    Kafka Streams 在org.apache.kafka.common.serialization 包下带有一些原始/基本的 SerDes(序列化器/反序列化器)。

    您可以在此处找到文档:

    Confluent 还为通用 Avro 和特定 Avro 格式的数据提供兼容模式注册表的 Avro SerDes。您可以在此处找到文档:

    您还可以使用示例附带的用于 JSON 的基本 SerDe 实现

    作为最后的手段,您始终可以创建自己的自定义 SerDes。为此,您必须:

    1. 通过实现为您的数据类型T 编写一个序列化程序 org.apache.kafka.common.serialization.Serializer
    2. 通过实现为T 编写反序列化器 org.apache.kafka.common.serialization.Deserializer
    3. 通过实现为T 编写一个serde org.apache.kafka.common.serialization.Serde,你要么做 手动(参见上一节中现有的 SerDes)或通过 利用 Serdes 中的辅助函数,例如 Serdes.serdeFrom(Serializer<T>, Deserializer<T>)。请注意,您将 如果你需要实现你自己的类(没有泛型类型) 想要在提供的配置中使用您的自定义 serde 卡夫卡流。如果您的 serde 类具有泛型类型或者您使用 Serdes.serdeFrom(Serializer<T>, Deserializer<T>),你可以通过你的 serde 仅通过方法调用(例如 builder.stream("topicName", Consumed.with(...)))。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-01-10
      • 2017-02-08
      • 2018-03-06
      • 2016-08-03
      • 2018-09-15
      • 2018-03-07
      相关资源
      最近更新 更多