【发布时间】:2020-11-22 06:18:24
【问题描述】:
我有以下用例。
Xml 文件被写入我想通过 flink 使用和处理的 kafka 主题。 必须重命名 xml 属性以匹配数据库表列。这些重命名必须灵活且可在 flink 作业之外进行维护。 最后,必须将属性写入数据库。 每个 xml 文档代表一个数据库记录。
作为第二步,必须汇总过去 x 分钟内所有 xml 文档的所有属性。
据我所知,到目前为止,flink 能够完成所有提到的步骤,但我不知道如何正确地实现它。
目前我已经实现了 kafka 源,检索 xml 文档并通过自定义 MapFunction 解析它。在那里我创建了一个 POJO 并将每个属性名称和值存储在 HashMap 中。
public class Data{
private Map<String,String> attributes = HashMap<>();
}
HashMap 包含:
Key: path.to.attribute.one Value: Value of attribute one
现在我想使用广播状态将原始属性名称更改为数据库列名称。 在这个阶段,我卡住了,因为我的 POJO 数据带有 HashMap 中的属性,但我不知道如何通过广播将其与映射连接。
另一种方法是将 xml 文档属性平面映射到单个记录中。这给我留下了两个问题:
- 如何确保一个文档中的属性不会与流中另一个文档中的属性混合
- 如何将一个文档的所有属性合并回来作为一条记录插入到数据库中
对于第二阶段,我知道 Window 功能,即使我没有详细了解它,但我想它会符合我的要求。这个阶段的问题是我是否可以在一项工作中使用多个接收器,而一个是原始数据流和一个聚合数据流。
有人可以帮忙提示一下吗?
干杯
更新 这是我到目前为止所得到的——我简化了 XmlData POJO 代表我解析的 xml 文档的代码。
public class StreamingJob {
static Logger LOG = LoggerFactory.getLogger(StreamingJob.class);
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
XmlData xmlData1 = new XmlData();
xmlData1.addAttribute("path.to.attribute.eventName","Start");
xmlData1.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:00.000");
xmlData1.addAttribute("third.path.to.attribute.eventSource","Source1");
xmlData1.addAttribute("path.to.attribute.additionalAttribute","Lorem");
XmlData xmlData2 = new XmlData();
xmlData2.addAttribute("path.to.attribute.eventName","Start");
xmlData2.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
xmlData2.addAttribute("third.path.to.attribute.eventSource","Source2");
xmlData2.addAttribute("path.to.attribute.additionalAttribute","First");
XmlData xmlData3 = new XmlData();
xmlData3.addAttribute("path.to.attribute.eventName","Start");
xmlData3.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
xmlData3.addAttribute("third.path.to.attribute.eventSource","Source1");
xmlData3.addAttribute("path.to.attribute.additionalAttribute","Day");
Mapping mapping1 = new Mapping();
mapping1.addMapping("path.to.attribute.eventName","EVENT_NAME");
mapping1.addMapping("second.path.to.attribute.eventTimestamp","EVENT_TIMESTAMP");
DataStream<Mapping> mappingDataStream = env.fromElements(mapping1);
MapStateDescriptor<String, Mapping> mappingStateDescriptor = new MapStateDescriptor<String, Mapping>(
"MappingBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Mapping>() {}));
BroadcastStream<Mapping> mappingBroadcastStream = mappingDataStream.broadcast(mappingStateDescriptor);
DataStream<XmlData> dataDataStream = env.fromElements(xmlData1, xmlData2, xmlData3);
//Convert the xml with all attributes to a stream of attribute names and values
DataStream<Tuple2<String, String>> recordDataStream = dataDataStream
.flatMap(new CustomFlatMapFunction());
//Map the attributes with the mapping information
DataStream<Tuple2<String,String>> outputDataStream = recordDataStream
.connect(mappingBroadcastStream)
.process();
env.execute("Process xml data and write it to database");
}
static class XmlData{
private Map<String,String> attributes = new HashMap<>();
public XmlData(){
}
public String toString(){
return this.attributes.toString();
}
public Map<String,String> getColumns(){
return this.attributes;
}
public void addAttribute(String key, String value){
this.attributes.put(key,value);
}
public String getAttributeValue(String attributeName){
return attributes.get(attributeName);
}
}
static class Mapping{
//First string is the attribute path and name
//Second string is the database column name
Map<String,String> mappingTuple = new HashMap<>();
public Mapping(){}
public void addMapping(String attributeNameWithPath, String databaseColumnName){
this.mappingTuple.put(attributeNameWithPath,databaseColumnName);
}
public Map<String, String> getMappingTuple() {
return mappingTuple;
}
public void setMappingTuple(Map<String, String> mappingTuple) {
this.mappingTuple = mappingTuple;
}
}
static class CustomFlatMapFunction implements FlatMapFunction<XmlData, Tuple2<String,String>> {
@Override
public void flatMap(XmlData xmlData, Collector<Tuple2< String,String>> collector) throws Exception {
for(Map.Entry<String,String> entrySet : xmlData.getColumns().entrySet()){
collector.collect(new Tuple2<>(entrySet.getKey(), entrySet.getValue()));
}
}
}
static class CustomBroadcastingFunction extends BroadcastProcessFunction {
@Override
public void processElement(Object o, ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
}
@Override
public void processBroadcastElement(Object o, Context context, Collector collector) throws Exception {
}
}
}
【问题讨论】:
-
这看起来与您的问题stackoverflow.com/questions/64894695/… 非常相似。最好只涵盖您需要从上一个问题的答案中澄清的问题。
-
我只是想给出整个用例的大图,以便人们可以考虑回答这个问题或就另一个我没有想到的选项提供建议,因为我对这个主题很陌生.但是感谢您的评论,我已将问题编辑得更准确。
标签: java apache-flink flink-streaming