【问题标题】:Spark Group By Key to (String, Iterable<String>)Spark Group By Key (String, Iterable<String>)
【发布时间】:2018-11-25 02:29:19
【问题描述】:

我正在尝试通过键对 urldata 进行分组,其中值将是字符串

样本数据:

url_3 url_2
url_3 url_2
url_3 url_1
url_4 url_3
url_4 url_1

预期结果:

(url_3,(url_2,url_1))
(url_4,(url_3,url_1))

1) 加载urldata:

Dataset<String> lines = spark.read()
    .textFile("C:/Users/91984/workspace/myApp/src/test/resources/in/urldata.txt");

2) 使用空间分割数据集

Encoder<Tuple2<String, String>> encoder2 = 
    Encoders.tuple(Encoders.STRING(), Encoders.STRING());
Dataset<Tuple2<String, String>> tupleRDD = lines.map(f->{
    Tuple2<String, String> m = 
        new Tuple2<String, String>(f.split(" ")[0], f.split(" ")[1]);
    return m;
},encoder2);

3) 使用 groupbyKey 将 tupleRDD 数据库按 key 分组

KeyValueGroupedDataset<String, Tuple2<String, String>> keygrpDS = 
    tupleRDD.groupByKey(f->f._1, Encoders.STRING());

谁能解释一下为什么 groupByKey 在第 3 步返回 KeyValueGroupedDataset&lt;String, Tuple2&lt;String, String&gt;&gt; 而不是 KeyValueGroupedDataset&lt;String, Iterable&lt;String&gt;&gt; 以及为了获得预期结果需要做哪些更改。

【问题讨论】:

  • 如果您仍然遇到问题,请接受正确的答案或说明任何问题。

标签: java apache-spark apache-spark-sql


【解决方案1】:

这就是它在 spark 中处理数据集的方式。当你有一个Dataset&lt;T&gt; 类型的数据集时,你可以通过一些映射函数对它进行分组,该函数接受一个类型为 T 的对象并返回一个类型为 K 的对象(键)。你得到的是一个KeyValueGroupedDataset&lt;K,T&gt;,你可以在它上面调用一个聚合函数(见the javadoc)。在您的情况下,您可以使用mapGroups,您可以提供一个函数,将键K 和可迭代的Iterable&lt;T&gt; 映射到您选择的新对象R。如果有帮助,在您的代码中,T 是 Tuple2,K 是 URL。

【讨论】:

    【解决方案2】:

    Spark 要求您使用 aggregation 方法关注您的 groupBY。我会将 tupleRDD 作为DataFrame 之类的:

    column1 column2
    
    url_3 url_2
    url_3 url_2
    url_3 url_1
    url_4 url_3
    url_4 url_1
    

    并传递collect_list(column2) 喜欢

    df.groupBy('column1').agg('column2', collect_list('column2'))

    这个例子是用 Python 编写的。不过,Scala/Java API 应该是类似的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-05-23
      • 1970-01-01
      • 2022-12-02
      • 1970-01-01
      • 1970-01-01
      • 2020-09-03
      • 2021-04-10
      相关资源
      最近更新 更多