排序通常应该在调用 collect() 之前完成,因为这会将数据集返回给驱动程序,这也是在 java 中编写 hadoop map-reduce 作业的方式,以便编写您想要的最终输出(通常)到 HDFS。借助 spark API,这种方法可以灵活地将输出以“原始”形式写入您想要的位置,例如写入可用作进一步处理输入的文件。
可以按照 eliasah 的建议在 collect() 之前使用 spark 的 scala API 排序,并使用 Tuple2.swap() 两次,一次是在排序之前,一次是在排序之后,以便生成按第二个递增或递减顺序排序的元组列表字段(名为 _2)并包含第一个字段(名为 _1)中的单词数。下面是如何在 spark-shell 中编写脚本的示例:
// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _, 1) // 2nd arg configures one task (same as number of partitions)
.map(item => item.swap) // interchanges position of entries in each tuple
.sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
.map(item => item.swap)
为了反转排序的顺序,请使用 sortByKey(false,1),因为它的第一个 arg 是升序的布尔值。它的第二个参数是任务数(相当于分区数),设置为 1 用于测试只需要一个输出数据文件的小输入文件; reduceByKey 也采用这个可选参数。
在此之后,wordCounts RDD 可以作为文本文件保存到带有 saveAsTextFile(directory_pathname) 的目录中,其中将存放一个或多个 part-xxxxx 文件
(从part-00000 开始)取决于为作业配置的reducer 数量(每个reducer 1 个输出数据文件),一个_SUCCESS 文件,取决于作业是否成功和.crc 文件。
使用 pyspark 一个与上面显示的 scala 脚本非常相似的 python 脚本产生的输出实际上是相同的。这是演示按值对集合进行排序的 pyspark 版本:
file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
.map(lambda (a, b): (b, a)) \
.sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
.map(lambda (a, b): (b, a))
为了按降序排序,它的第一个 arg 应该是 0。由于 python 将前导和尾随空格作为数据捕获,因此在将每一行拆分为空格之前插入 strip(),但使用 spark-shell/scala 不需要这样做.
spark 和 python 版本的 wordCount 输出的主要区别在于 spark 输出 (word,3) python 输出 (u'word', 3) 的地方。
有关 spark RDD 方法的更多信息,请参阅http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html for python 和 https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD for scala。
在 spark-shell 中,在 wordCounts 上运行 collect() 会将其从 RDD 转换为 Array[(String, Int)] = Array[Tuple2(String,Int)] 本身可以在每个 Tuple2 元素使用:
Array.sortBy(_._2)
sortBy 还采用可选的隐式数学。Ordering 参数,例如 Romeo Kienzler 在此问题的先前答案中显示。 Array.sortBy(_._2) 将通过在运行 map-reduce 脚本之前定义隐式反向排序来对其 _2 字段上的 Array Tuple2 元素进行反向排序,因为它会覆盖 Int 的预先存在的排序。 Romeo Kienzler 已经定义的反向 int 排序是:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}
另一种定义这种反向排序的常用方法是颠倒 a 和 b 的顺序,并将 (-1) 放在比较定义的右侧:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = b.compare(a)
}