【发布时间】:2016-05-28 19:07:33
【问题描述】:
我正在尝试使用 Jackson 库从 Kafka 主题中读取字符串并执行来自另一个流的连接。
这是一个包含两个数据流的示例代码。我想对这些消息流执行连接操作。
例如,传入的流是:
messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}
加入条件是messageStream1."A" = messageStream2."B"。如何在 Flink 中实现?
数据流 1:
DataStream<String> messageStream1 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream1.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
数据流 2:
DataStream<String> messageStream2 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream2.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
【问题讨论】:
标签: join stream apache-kafka apache-flink