【问题标题】:Flink - serialize a pojo to Kafka sinkFlink - 将 pojo 序列化到 Kafka 接收器
【发布时间】:2021-04-15 14:40:05
【问题描述】:

我的 Flink 代码的结构是: 使用 kafka (topic_1_in) 获取数据 -> 反序列化消息 -> 映射 -> 操作数据 -> 获取 POJO -> 序列化消息 -> 使用 kafka (topic_1_out) 发送数据

我现在正处于最后一个阶段,我想序列化我的 POJO。我在 Flink 网站上找到了以下示例:

DataStream<String> stream = ...

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

    FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
            "my-topic",                  // target topic
            new SimpleStringSchema(),    // serialization schema
            properties,                  // producer config
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
    
    stream.addSink(myProducer);

但我不明白如何实现序列化架构。

我还阅读了不同的可能性:

https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

但是,对于如何将我的 POJO 转换为字符串以提供 Kafka 接收器,我仍然有些困惑。这个类真的很简单,所以我认为这很简单。

public class POJO_block {
            public Double id;
            public Double tr_p;
            public Integer size;
            public Double last_info;
            public Long millis_last;
            private ArrayList<Tuple3<Integer, Integer, Integer>> list_val;

}

任何例子都将不胜感激。

谢谢

【问题讨论】:

    标签: java apache-kafka apache-flink flink-streaming


    【解决方案1】:

    问题中提到的link是指内部的Flink序列化,当Flink需要将我们的一些数据从集群的一个部分传送到另一个部分时使用它,但在写入Kafka时不相关。

    当 Flink 与外部存储(如 Kafka)交互时,它依赖于一个连接器,而在这样做时如何进行序列化取决于该连接器的配置细节以及该连接器的具体机制。底层外部存储(例如,在 kafka 记录的情况下,keyvalue 等概念)。

    在您描述的情况下,由于您的程序正在使用 DataStream API 并与 Kafka 通信,因此您使用的连接器是 Kafka Datastream API,其文档位于 here

    在您提供的代码中,FlinkKafkaProducer sink 的这个参数指定了序列化的发生方式:

    // this is probably not what you want:
    new SimpleStringSchema(),    // serialization schema
    

    此配置不起作用,因为SimpleStringSchema 期望字符串作为输入,因此POJO_block 的流将使其失败。

    您可以改为传递org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema 的任何实现,其中包含一个主函数,让您定义kafka 键的字节值和对应于每个POJO_block 块实例的值(即下面的T):

    ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
    

    请注意,如果您使用 Table API 而不是 DataStream API 来读取和写入 Kafka,则将使用 this connector,它具有方便的 format 配置和现成的格式,如 csv、json , avro, Debezium...

    【讨论】:

    • 嗨,你能给我一个 KafkaSerializationSchema 实现的例子吗?是的,我正在使用 DataStream API。感谢您的帮助。
    • 嗨,我找到了这个很好的例子,(stackoverflow.com/questions/58644549/…),但考虑到我将在 python 中使用消息,我正在考虑将我的 POJO 转换为字符串然后序列化它,而不是发送一个 POJO 并在 python 中努力打开它。
    • 嗨,您找到的示例似乎相当不错:b= mapper.writeValueAsBytes(obj); 行实际上是将 pojo 序列化为 json(作为byte[],因为这是 Kafka 需要的,尽管这些字节实际上是 UTF- 8个字符串编码)=>我希望python应用程序能够毫无问题地解析它。如果你想快速仔细检查,你也可以安装伟大的 kafkacat 工具,然后在 linux 命令行上用简单的kafkacat -C -b localhost:9092 -t my-topic 打印 Kafka 的内容,这应该将POJO_block 的 json 版本打印到标准输出。
    • 我不知道 Kafkacat 这很酷!感谢您的帮助,非常感谢。 s
    猜你喜欢
    • 1970-01-01
    • 2021-09-09
    • 1970-01-01
    • 2020-08-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-29
    • 2019-01-15
    相关资源
    最近更新 更多