【问题标题】:Spark get collection sorted by valueSpark获取按值排序的集合
【发布时间】:2014-08-30 15:45:57
【问题描述】:

我正在尝试这个教程http://spark.apache.org/docs/latest/quick-start.html 我首先从一个文件创建了一个集合

textFile = sc.textFile("README.md")

然后我尝试了一个命令来计算单词:

wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

要打印集合:

 wordCounts.collect()

我找到了如何使用命令 sortByKey 按单词排序。我想知道如何对按值排序做同样的事情,在这种情况下,在文档中出现一个单词的数字。

【问题讨论】:

  • 你可以试试这样的:textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b).map(item => item.swap).sortByKey() 吗?
  • @user3702916 - eliasah 的解决方案适用于 Scala API。将其转换为 Python,这应该可以工作。所以不要使用map(item => item.swap) 试试map(lambda (x,y): (y,x))
  • 我们如何在 Java API 中做到这一点?
  • ...reduceByKey(lambda a, b: a+b).map(lambda x: (x[1],x[0])).sortByKey() sortByKey(0) 用于 python 中的降序

标签: sorting apache-spark word-count


【解决方案1】:

排序通常应该在调用 collect() 之前完成,因为这会将数据集返回给驱动程序,这也是在 java 中编写 hadoop map-reduce 作业的方式,以便编写您想要的最终输出(通常)到 HDFS。借助 spark API,这种方法可以灵活地将输出以“原始”形式写入您想要的位置,例如写入可用作进一步处理输入的文件。

可以按照 eliasah 的建议在 collect() 之前使用 spark 的 scala API 排序,并使用 Tuple2.swap() 两次,一次是在排序之前,一次是在排序之后,以便生成按第二个递增或递减顺序排序的元组列表字段(名为 _2)并包含第一个字段(名为 _1)中的单词数。下面是如何在 spark-shell 中编写脚本的示例:

// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _, 1)  // 2nd arg configures one task (same as number of partitions)
  .map(item => item.swap) // interchanges position of entries in each tuple
  .sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
  .map(item => item.swap)

为了反转排序的顺序,请使用 sortByKey(false,1),因为它的第一个 arg 是升序的布尔值。它的第二个参数是任务数(相当于分区数),设置为 1 用于测试只需要一个输出数据文件的小输入文件; reduceByKey 也采用这个可选参数。

在此之后,wordCounts RDD 可以作为文本文件保存到带有 saveAsTextFile(directory_pathname) 的目录中,其中将存放一个或多个 part-xxxxx 文件 (从part-00000 开始)取决于为作业配置的reducer 数量(每个reducer 1 个输出数据文件),一个_SUCCESS 文件,取决于作业是否成功和.crc 文件。

使用 pyspark 一个与上面显示的 scala 脚本非常相似的 python 脚本产生的输出实际上是相同的。这是演示按值对集合进行排序的 pyspark 版本:

file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
    .map(lambda (a, b): (b, a)) \
    .sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
    .map(lambda (a, b): (b, a))

为了按降序排序,它的第一个 arg 应该是 0。由于 python 将前导和尾随空格作为数据捕获,因此在将每一行拆分为空格之前插入 strip(),但使用 spark-shell/scala 不需要这样做.

spark 和 python 版本的 wordCount 输出的主要区别在于 spark 输出 (word,3) python 输出 (u'word', 3) 的地方。

有关 spark RDD 方法的更多信息,请参阅http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html for python 和 https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD for scala。

在 spark-shell 中,在 wordCounts 上运行 collect() 会将其从 RDD 转换为 Array[(String, Int)] = Array[Tuple2(String,Int)] 本身可以在每个 Tuple2 元素使用:

Array.sortBy(_._2) 

sortBy 还采用可选的隐式数学。Ordering 参数,例如 Romeo Kienzler 在此问题的先前答案中显示。 Array.sortBy(_._2) 将通过在运行 map-reduce 脚本之前定义隐式反向排序来对其 _2 字段上的 Array Tuple2 元素进行反向排序,因为它会覆盖 Int 的预先存在的排序。 Romeo Kienzler 已经定义的反向 int 排序是:

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}

另一种定义这种反向排序的常用方法是颠倒 a 和 b 的顺序,并将 (-1) 放在比较定义的右侧:

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = b.compare(a)
}   

【讨论】:

  • 此解决方案是否也适用于火花流。我的意思是这会跨批次对单词进行排序。
【解决方案2】:

以更pythonic的方式来做。

# In descending order
''' The first parameter tells number of elements
    to be present in output.
''' 
data.takeOrdered(10, key=lambda x: -x[1])
# In Ascending order
data.takeOrdered(10, key=lambda x: x[1])

【讨论】:

    【解决方案3】:

    对于那些希望按值排序的前 N ​​个元素:

    theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))
    

    如果您想按字符串长度排序。

    另一方面,如果值已经采用适合您所需排序的形式,则:

    theRDD.takeOrdered(N, lambda (key, value): -1 * value)
    

    足够了。

    【讨论】:

      【解决方案4】:

      你可以这样做

      // for reverse order
      implicit val sortIntegersByString = new Ordering[Int] {
          override def compare(a: Int, b: Int) = a.compare(b)*(-1)
      }
      
      counts.collect.toSeq.sortBy(_._2)
      

      所以基本上你将你的 RDD 转换为一个序列并使用 sort 方法对其进行排序。

      上面的块全局更改排序行为以获得降序排序。

      【讨论】:

        【解决方案5】:

        我认为您可以使用here 记录的通用sortBy 转换(不是一个动作,即它返回一个RDD 而不是一个数组)。

        所以在你的情况下,你可以这样做

        wordCounts.sortBy(lambda (word, count): count)
        

        【讨论】:

          【解决方案6】:

          按值对输出进行排序的最简单方法。在 reduceByKey 之后,您可以将输出如键作为值和值作为键交换,然后您可以应用 sortByKey 方法,其中错误按降序排序。默认情况下,它将按升序排序。

           val test=textFile.flatMap(line=> line.split(" ")).map(word=> (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(false)
          

          【讨论】:

            【解决方案7】:

            @kef 为 python 提供的解决方案就在 ...

            以下需要更改-

            .map(lambda (a, b): (b, a))
            

            .map(lambda a: (a[1], a[0]))
            

            【讨论】:

              【解决方案8】:

              我设法使用 Python 解决了这个问题。所以我创建了一个对值列表并按值排序:

              out = wordCounts.collect()
              outSort = sorted(out, key=lambda word:word[1])
              

              【讨论】:

              • 您正在将所有结果收集回驱动程序并在那里进行排序。它会起作用,但前提是您的结果集相对较小。如需大规模运行的解决方案,请参阅 eliasah 的解决方案。
              • 这并不能解决大数据的问题。如果数据很小,为什么还需要 spark。
              【解决方案9】:
               wordCounts.map(lambda (a,b) : (b,a)).sortByKey(ascending=False).map(lambda (a,b) : (b,a)).collect()
              

              此解决方案有效,因为 wordCount rdd 的每一行看起来像这样:

              (字数,计数)

              第一个映射产生一个 rdd,元组的顺序颠倒了,即现在它们看起来像这样

              (计数,字)

              现在,当我们执行 sortByKey 时,COUNT 被视为我们想要的键。 然后第二个映射将现在排序的第二个 rdd 映射回

              的原始格式

              (字数,计数)

              对于每一行,但不是现在,这些行按字数排序。

              这里的一个隐含假设是映射不会改变 RDD 行的顺序,否则第二个映射可能会弄乱排序。

              【讨论】:

              • 这个答案需要一些解释......你不能只删除一些代码:stackoverflow.com/help/how-to-answer
              • 只发布一个方程式并没有多大帮助,除非你解释它在做什么。
              【解决方案10】:

              使用 SCALA 进行 sortByValue 的更好方法是

              val count = oozie.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).sortBy(x => x._2)
              

              x._2 表示任何列表 x 的第二个元素。

              按降序排序“-x._2”

              scala> val count = oozie.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).sortBy(x => -x._2)
              
              count: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[26] at sortBy at <console>:25
              
              scala> count.take(10)
              res6: Array[(String, Int)] = Array((the,4603), (to,1707), (and,1595), (of,1337), (a,1319), (Oozie,1302), (in,1131), (.,994), (is,956), (for,753))
              

              【讨论】:

                【解决方案11】:

                Python 方式:现在您可以编写一个映射以在键(第一个)位置获取值(第二个元素)-> 按该键(含义值)排序-> 再次更改位置。简单:)

                wordCounts.map(lambda pair: (pair[1], pair[0])).sortByKey().map(lambda pair: (pair[1], pair[0]))
                

                【讨论】:

                • 请在您的答案中添加一些解释,以便其他人可以从中学习
                猜你喜欢
                • 2021-09-20
                • 1970-01-01
                • 1970-01-01
                • 2019-05-26
                • 1970-01-01
                • 2015-10-18
                • 2021-01-15
                • 1970-01-01
                • 2016-09-15
                相关资源
                最近更新 更多