【问题标题】:Apache Flink - Serialize json and perform join operationApache Flink - 序列化 json 并执行连接操作
【发布时间】:2016-05-28 19:07:33
【问题描述】:

我正在尝试使用 Jackson 库从 Kafka 主题中读取字符串并执行来自另一个流的连接。

这是一个包含两个数据流的示例代码。我想对这些消息流执行连接操作。

例如,传入的流是:

messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}

加入条件是messageStream1."A" = messageStream2."B"。如何在 Flink 中实现?

数据流 1:

DataStream<String> messageStream1 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));

messageStream1.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});

数据流 2:

DataStream<String> messageStream2 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));

messageStream2.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});

【问题讨论】:

    标签: join stream apache-kafka apache-flink


    【解决方案1】:

    您需要将键字段提取到一个额外的属性中,以便 Flink 可以访问它(另一种方法是提供自定义键选择器:https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#specifying-keys)。

    因此,map(...) 的返回类型可能是 Tuple2&lt;String,JsonNode&gt;(如果 String 是连接属性的正确类型)。

    然后,您可以按照文档 (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html) 中的说明指定您的加入:

    messageStream1.join(messageStream2)
        .where(0).equalTo(0) // both zeros indicate that the join happens on the zero's attribute, ie, the String attribute of Tuple2
        .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
        .apply(new JoinFunction() {...});
    

    为了使用DataStream API 执行连接,您还需要指定一个连接窗口。只能连接属于同一窗口的元组。

    【讨论】:

    • 感谢您的回复。我没有输入数据的固定结构。我所知道的是它是一个 JSON。如果是这种情况,我是否需要借助反射来动态自动创建对象?我知道因此可能会产生开销。
    • 如果我理解正确,您有两个 JSON 对象流并且想要加入两者。因此,您希望每个对象至少有一个固定字段“X”,其中包含要用于连接的已知类型“T”。因此,您可以提取“X”并将其放入 Tuple2 中,该 Tuple2 在其两个属性中保存键和整个 JSON。还是有可能某些对象不包含“X”?如果是,您的应对策略是什么?
    • 你是对的。让我稍微改写一下。 Stream1 = {"A":"a", "B":"b"} Stream2 = {"B":"a","X":"x"} 鉴于这些是 2 个流,如果我必须加入 Stream1.A=Stream2.B 如果我没听错的话,map 函数应该返回带有 的元组?
    • 不确定我是否理解正确。似乎在您的示例中,每个流仅显示一个元组。如果我们假设 A 和 B 是您想要加入的字段,map 会将 JSON {"A":"a", "B":"b"} 转换为 &lt;"a", {"A":"a", "B":"b"}&gt;(即 Tuple2&lt;String, JsonNode&gt;)。与 stream2 类似,{"B":"a","X":"x"} 转换为 &lt;"a", {"B":"a","X":"x"}&gt;。因此,您只需提取键字段值,并将键与完整的 JSON 对象一起发出。
    • 抱歉 .. 回复需要一点时间。感谢您的解释。我离我的理解还差得很远。再次感谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-05-05
    • 1970-01-01
    • 2020-10-17
    • 2018-12-18
    • 2017-09-18
    • 1970-01-01
    • 2020-08-17
    相关资源
    最近更新 更多