【发布时间】:2017-10-19 10:08:59
【问题描述】:
我对 Flink 还是很陌生。我有这段代码可以对输入的 JSON 进行映射、分组和求和。
这与字数统计示例非常相似。
我希望得到(vacant,1) (occupied,2)
但是,由于某种原因,我收到了(occupied,1) (vacant,1) (occupied,2)
public static void main(String[] args) throws Exception {
String s = "{\n" +
" \"Port_128\": \"occupied\",\n" +
" \"Port_129\": \"occupied\",\n" +
" \"Port_120\": \"vacant\"\n" +
"\n" +
"}";
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> in = env.fromElements(s);
SingleOutputStreamOperator<Tuple2<String, Integer>> t =
in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>>
collector) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(s);
node.elements().forEachRemaining(v -> {
collector.collect(new Tuple2<>(v.textValue(), 1));
});
}
}).keyBy(0).sum(1);
t.print();
env.execute();
【问题讨论】:
标签: apache-flink flink-streaming