【问题标题】:Apache Flink Process xml and write them to databaseApache Flink Process xml并将它们写入数据库
【发布时间】:2020-11-22 06:18:24
【问题描述】:

我有以下用例。

Xml 文件被写入我想通过 flink 使用和处理的 kafka 主题。 必须重命名 xml 属性以匹配数据库表列。这些重命名必须灵活且可在 flink 作业之外进行维护。 最后,必须将属性写入数据库。 每个 xml 文档代表一个数据库记录。

作为第二步,必须汇总过去 x 分钟内所有 xml 文档的所有属性。

据我所知,到目前为止,flink 能够完成所有提到的步骤,但我不知道如何正确地实现它。

目前我已经实现了 kafka 源,检索 xml 文档并通过自定义 MapFunction 解析它。在那里我创建了一个 POJO 并将每个属性名称和值存储在 HashMap 中。

public class Data{
    private Map<String,String> attributes = HashMap<>();
}

HashMap 包含:

Key: path.to.attribute.one Value: Value of attribute one

现在我想使用广播状态将原始属性名称更改为数据库列名称。 在这个阶段,我卡住了,因为我的 POJO 数据带有 HashMap 中的属性,但我不知道如何通过广播将其与映射连接。

另一种方法是将 xml 文档属性平面映射到单个记录中。这给我留下了两个问题:

  • 如何确保一个文档中的属性不会与流中另一个文档中的属性混合
  • 如何将一个文档的所有属性合并回来作为一条记录插入到数据库中

对于第二阶段,我知道 Window 功能,即使我没有详细了解它,但我想它会符合我的要求。这个阶段的问题是我是否可以在一项工作中使用多个接收器,而一个是原始数据流和一个聚合数据流。

有人可以帮忙提示一下吗?

干杯

更新 这是我到目前为止所得到的——我简化了 XmlData POJO 代表我解析的 xml 文档的代码。

public class StreamingJob {
    static Logger LOG = LoggerFactory.getLogger(StreamingJob.class);

    public static void main(String[] args) throws Exception {

        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        XmlData xmlData1 = new XmlData();
        xmlData1.addAttribute("path.to.attribute.eventName","Start");
        xmlData1.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:00.000");
        xmlData1.addAttribute("third.path.to.attribute.eventSource","Source1");
        xmlData1.addAttribute("path.to.attribute.additionalAttribute","Lorem");

        XmlData xmlData2 = new XmlData();
        xmlData2.addAttribute("path.to.attribute.eventName","Start");
        xmlData2.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
        xmlData2.addAttribute("third.path.to.attribute.eventSource","Source2");
        xmlData2.addAttribute("path.to.attribute.additionalAttribute","First");

        XmlData xmlData3 = new XmlData();
        xmlData3.addAttribute("path.to.attribute.eventName","Start");
        xmlData3.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
        xmlData3.addAttribute("third.path.to.attribute.eventSource","Source1");
        xmlData3.addAttribute("path.to.attribute.additionalAttribute","Day");

        Mapping mapping1 = new Mapping();
        mapping1.addMapping("path.to.attribute.eventName","EVENT_NAME");
        mapping1.addMapping("second.path.to.attribute.eventTimestamp","EVENT_TIMESTAMP");

        DataStream<Mapping> mappingDataStream = env.fromElements(mapping1);

        MapStateDescriptor<String, Mapping> mappingStateDescriptor = new MapStateDescriptor<String, Mapping>(
                "MappingBroadcastState",
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint<Mapping>() {}));

        BroadcastStream<Mapping> mappingBroadcastStream = mappingDataStream.broadcast(mappingStateDescriptor);

        DataStream<XmlData> dataDataStream = env.fromElements(xmlData1, xmlData2, xmlData3);

        //Convert the xml with all attributes to a stream of attribute names and values
        DataStream<Tuple2<String, String>> recordDataStream = dataDataStream
                .flatMap(new CustomFlatMapFunction());

        //Map the attributes with the mapping information
        DataStream<Tuple2<String,String>> outputDataStream = recordDataStream
                .connect(mappingBroadcastStream)
                .process();

        env.execute("Process xml data and write it to database");
    }
    
    static class XmlData{
        private Map<String,String> attributes = new HashMap<>();

    public XmlData(){
        }

        public String toString(){
            return this.attributes.toString();
        }

        public Map<String,String> getColumns(){
            return this.attributes;
        }

        public void addAttribute(String key, String value){
            this.attributes.put(key,value);
        }

        public String getAttributeValue(String attributeName){
            return attributes.get(attributeName);
        }
    }
    
    static class Mapping{
        //First string is the attribute path and name
        //Second string is the database column name
        Map<String,String> mappingTuple = new HashMap<>();

        public Mapping(){}

        public void addMapping(String attributeNameWithPath, String databaseColumnName){
            this.mappingTuple.put(attributeNameWithPath,databaseColumnName);
        }

        public Map<String, String> getMappingTuple() {
            return mappingTuple;
        }

        public void setMappingTuple(Map<String, String> mappingTuple) {
            this.mappingTuple = mappingTuple;
        }
    }

    static class CustomFlatMapFunction implements FlatMapFunction<XmlData, Tuple2<String,String>> {

        @Override
        public void flatMap(XmlData xmlData, Collector<Tuple2< String,String>> collector) throws Exception {
            for(Map.Entry<String,String> entrySet : xmlData.getColumns().entrySet()){
                collector.collect(new Tuple2<>(entrySet.getKey(), entrySet.getValue()));
            }
        }
    }

    static class CustomBroadcastingFunction extends BroadcastProcessFunction {
        @Override
        public void processElement(Object o, ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
        }
        @Override
        public void processBroadcastElement(Object o, Context context, Collector collector) throws Exception {
        }
    }
}

【问题讨论】:

  • 这看起来与您的问题stackoverflow.com/questions/64894695/… 非常相似。最好只涵盖您需要从上一个问题的答案中澄清的问题。
  • 我只是想给出整个用例的大图,以便人们可以考虑回答这个问题或就另一个我没有想到的选项提供建议,因为我对这个主题很陌生.但是感谢您的评论,我已将问题编辑得更准确。

标签: java apache-flink flink-streaming


【解决方案1】:

下面是一些示例代码,说明如何使用 BroadcastStream 执行此操作。有一个微妙的问题,属性重映射数据可能会显示在 一条记录之后。通常,您会使用带状态的计时器来保留任何缺少重新映射数据的记录,但在您的情况下,不清楚丢失的重新映射是“需要等待更长时间”还是“不存在映射”。无论如何,这应该让你开始......

    private static MapStateDescriptor<String, String> REMAPPING_STATE = new MapStateDescriptor<>("remappings", String.class, String.class);

    
    @Test
    public void testUnkeyedStreamWithBroadcastStream() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

        List<Tuple2<String, String>> attributeRemapping = new ArrayList<>();
        attributeRemapping.add(new Tuple2<>("one", "1"));
        attributeRemapping.add(new Tuple2<>("two", "2"));
        attributeRemapping.add(new Tuple2<>("three", "3"));
        attributeRemapping.add(new Tuple2<>("four", "4"));
        attributeRemapping.add(new Tuple2<>("five", "5"));
        attributeRemapping.add(new Tuple2<>("six", "6"));
        
        BroadcastStream<Tuple2<String, String>> attributes = env.fromCollection(attributeRemapping)
                .broadcast(REMAPPING_STATE);
        
        List<Map<String, Integer>> xmlData = new ArrayList<>();
        xmlData.add(makePOJO("one", 10));
        xmlData.add(makePOJO("two", 20));
        xmlData.add(makePOJO("three", 30));
        xmlData.add(makePOJO("four", 40));
        xmlData.add(makePOJO("five", 50));

        DataStream<Map<String, Integer>> records = env.fromCollection(xmlData);
        
        records.connect(attributes)
            .process(new MyRemappingFunction())
            .print();
        
        env.execute();
    }

    private Map<String, Integer> makePOJO(String key, int value) {
        Map<String, Integer> result = new HashMap<>();
        result.put(key, value);
        return result;
    }
    
    @SuppressWarnings("serial")
    private static class MyRemappingFunction extends BroadcastProcessFunction<Map<String, Integer>, Tuple2<String, String>, Map<String, Integer>> {

        @Override
        public void processBroadcastElement(Tuple2<String, String> in, Context ctx, Collector<Map<String, Integer>> out) throws Exception {
            ctx.getBroadcastState(REMAPPING_STATE).put(in.f0, in.f1);
        }

        @Override
        public void processElement(Map<String, Integer> in, ReadOnlyContext ctx, Collector<Map<String, Integer>> out) throws Exception {
            final ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(REMAPPING_STATE);

            Map<String, Integer> result = new HashMap<>();
            
            for (String key : in.keySet()) {
                if (state.contains(key)) {
                    result.put(state.get(key), in.get(key));
                } else {
                    result.put(key, in.get(key));
                }
            }
            
            out.collect(result);
        }
        
    }

【讨论】:

  • 您好,谢谢您的回答。我遇到了广播功能,但我被困在实施中。我不知道如何将 POJO 中的 HashMap 与映射结合起来。我已阅读链接提供的示例,但我不明白所有内容。提到了 ruleStream,但从未展示过它是如何实现的。
  • 当我通过属性键入流时,它不会使一个 xml 文档中的属性与另一个 xml 文档中的属性混合在一起吗?之后如何将属于一个xml文档的所有属性重新附加回来,以将它们保存为数据库中的一条记录?
  • 非常感谢,这让我找到了缺失的部分,我能够完成它。最重要的部分是,一个 xml 中的所有属性都不会与另一个 xml 中的属性混合在一起。我将 BroadcastProcessFunction 的输出更改为 XmlData pojo,但其余部分保持不变。干得好!
猜你喜欢
  • 2022-07-07
  • 2021-03-25
  • 1970-01-01
  • 2012-08-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-04-20
  • 2016-09-07
相关资源
最近更新 更多