【发布时间】:2019-11-13 13:34:51
【问题描述】:
kafka 流/Ksql 是否真的以某种方式原生支持 json?还支持哪些其他格式?我已经看到有可能将平面 json 解释为表格。我想更好地理解那部分; kafka-streams 通过 Ksql 可以通过 SQL 查询的其他格式是什么?这怎么可能或支持?什么是原生支持?
【问题讨论】:
标签: apache-kafka apache-kafka-streams ksqldb
kafka 流/Ksql 是否真的以某种方式原生支持 json?还支持哪些其他格式?我已经看到有可能将平面 json 解释为表格。我想更好地理解那部分; kafka-streams 通过 Ksql 可以通过 SQL 查询的其他格式是什么?这怎么可能或支持?什么是原生支持?
【问题讨论】:
标签: apache-kafka apache-kafka-streams ksqldb
KSQL
对于值格式,KSQL 支持 AVRO、JSON 和 DELIMITED(如 CSV)。
您可以在此处找到文档:
Kafka 流
Kafka Streams 在org.apache.kafka.common.serialization 包下带有一些原始/基本的 SerDes(序列化器/反序列化器)。
您可以在此处找到文档:
Confluent 还为通用 Avro 和特定 Avro 格式的数据提供兼容模式注册表的 Avro SerDes。您可以在此处找到文档:
您还可以使用示例附带的用于 JSON 的基本 SerDe 实现:
作为最后的手段,您始终可以创建自己的自定义 SerDes。为此,您必须:
T 编写一个序列化程序
org.apache.kafka.common.serialization.Serializer。T 编写反序列化器
org.apache.kafka.common.serialization.Deserializer。T 编写一个serde
org.apache.kafka.common.serialization.Serde,你要么做
手动(参见上一节中现有的 SerDes)或通过
利用 Serdes 中的辅助函数,例如
Serdes.serdeFrom(Serializer<T>, Deserializer<T>)。请注意,您将
如果你需要实现你自己的类(没有泛型类型)
想要在提供的配置中使用您的自定义 serde
卡夫卡流。如果您的 serde 类具有泛型类型或者您使用
Serdes.serdeFrom(Serializer<T>, Deserializer<T>),你可以通过你的
serde 仅通过方法调用(例如
builder.stream("topicName", Consumed.with(...)))。【讨论】: