【发布时间】:2021-04-15 14:40:05
【问题描述】:
我的 Flink 代码的结构是: 使用 kafka (topic_1_in) 获取数据 -> 反序列化消息 -> 映射 -> 操作数据 -> 获取 POJO -> 序列化消息 -> 使用 kafka (topic_1_out) 发送数据
我现在正处于最后一个阶段,我想序列化我的 POJO。我在 Flink 网站上找到了以下示例:
DataStream<String> stream = ...
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addSink(myProducer);
但我不明白如何实现序列化架构。
我还阅读了不同的可能性:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
但是,对于如何将我的 POJO 转换为字符串以提供 Kafka 接收器,我仍然有些困惑。这个类真的很简单,所以我认为这很简单。
public class POJO_block {
public Double id;
public Double tr_p;
public Integer size;
public Double last_info;
public Long millis_last;
private ArrayList<Tuple3<Integer, Integer, Integer>> list_val;
}
任何例子都将不胜感激。
谢谢
【问题讨论】:
标签: java apache-kafka apache-flink flink-streaming