【问题标题】:Kafka Streams: how to convert data types?Kafka Streams:如何转换数据类型?
【发布时间】:2018-04-11 00:12:35
【问题描述】:

我正在使用 Kafka 的 Streams API 和拓扑构建器。 我想知道如何才能拥有一个可以将一种数据类型转换为另一种数据类型的处理器,以便管道中的下一个处理器可以使用它。

作为一个简单的用例:

[topic]--(string)-->[processor: parse json]--(object)-->[processor 2]--(object)-->[sink]

有什么想法吗?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    我假设您想将 Kafka 主题中的消息 values 从 String 转换为 JSON。

    你只需要两部分:

    • 将字符串转换为 JSON(或 Pojo)的代码。在这里选择你需要的任何东西,例如有几个 Java 库可以让这变得简单。
    • 在 Kafka Streams 中,定义 (1) String 的值 serde 用于从源主题中读取,以及 (2) 定义相应的值 serde 以将 JSON 数据(或 Pojo)写入目标主题。 Serdes 需要在需要的时间/地点实现您的数据(例如,将您的 Pojos 写入 Kafka 需要实现)。

    请参阅https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview 下的示例代码,了解如何操作。将 JSON 与 Apache Kafka 的 Streams API 结合使用。

    【讨论】:

      猜你喜欢
      • 2018-02-28
      • 2021-03-09
      • 1970-01-01
      • 1970-01-01
      • 2021-10-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-22
      相关资源
      最近更新 更多