【问题标题】:Flink keyBy grouping issueFlink keyBy 分组问题
【发布时间】:2017-10-19 10:08:59
【问题描述】:

我对 Flink 还是很陌生。我有这段代码可以对输入的 JSON 进行映射、分组和求和。

这与字数统计示例非常相似。

我希望得到(vacant,1) (occupied,2)

但是,由于某种原因,我收到了(occupied,1) (vacant,1) (occupied,2)

  public static void main(String[] args) throws Exception {
        String s = "{\n" +
                "    \"Port_128\": \"occupied\",\n" +
                "    \"Port_129\": \"occupied\",\n" +
                "    \"Port_120\": \"vacant\"\n" +
                "\n" +
                "}";
        StreamExecutionEnvironment env = 
        StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> in = env.fromElements(s);
        SingleOutputStreamOperator<Tuple2<String, Integer>> t = 
        in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> 
            collector) throws Exception {
                ObjectMapper mapper = new ObjectMapper();
                JsonNode node = mapper.readTree(s);
                node.elements().forEachRemaining(v -> {
                    collector.collect(new Tuple2<>(v.textValue(), 1));
                });

            }
        }).keyBy(0).sum(1);

        t.print();
        env.execute();

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    运行你的代码,我得到:

    10/19/2017 11:27:38 Keyed Aggregation -> Sink: Unnamed(1/1) switched to RUNNING 
    (occupied,1)
    (occupied,2)
    (vacant,1)
    10/19/2017 11:28:03 Keyed Aggregation -> Sink: Unnamed(1/1) switched to FINISHED 
    

    这与您的输出略有不同,但很重要。原因是代码在接收数据时输出每个键的总和,所以首先它被第一个占用(输出 1),然后是第二个(输出这个键控进程的总和现在是 2),然后将空置发送到另一个键控进程并输出 1。所以这对我来说似乎是正确的输出。

    编辑

    根据下面的评论,这里是可以为您提供所需输出的代码:

    public static void main(String[] args) throws Exception {
      String s = "{\n" +
          "    \"Port_128\": \"occupied\",\n" +
          "    \"Port_129\": \"occupied\",\n" +
          "    \"Port_120\": \"vacant\"\n" +
          "\n" +
          "}";
      ExecutionEnvironment env =
          ExecutionEnvironment.getExecutionEnvironment();
      DataSet<String> in = env.fromElements(s);
      AggregateOperator<Tuple2<String, Integer>> t =
          in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>>
                collector) throws Exception {
              ObjectMapper mapper = new ObjectMapper();
              JsonNode node = mapper.readTree(s);
              node.elements().forEachRemaining(v -> {
                collector.collect(new Tuple2<>(v.textValue(), 1));
              });
    
            }
          }).groupBy(0).sum(1);
    
      t.print();
      env.execute();
    }
    

    【讨论】:

    • 我怎样才能每次都从 KeydAggregation 中阻止它以及它与字数示例有何不同?
    • 您设置了一个 Datatream 程序。 Flink 字数统计示例是一个 DataSet 程序。两者的行为不同。流中的数据在通过管道接收时被处理,因此它处理通过的每个元素。我将通过更改使用 DataSet 代码(如 wordcount 示例)的代码来更新答案。如果你运行它,你会得到你期望的输出。
    • 现在我明白了,我的错误是我使用流而不是数据集
    猜你喜欢
    • 2022-12-02
    • 1970-01-01
    • 1970-01-01
    • 2020-12-24
    • 1970-01-01
    • 2018-02-27
    • 1970-01-01
    • 2019-05-26
    • 1970-01-01
    相关资源
    最近更新 更多