【问题标题】:how to order spark RDD based on two columns如何根据两列订购火花RDD
【发布时间】:2017-01-16 14:14:43
【问题描述】:

我有以下 RDD 数据集:

ABC [G4, G3, G1]    3
FFF [G5, G4, G3]    3
CDE [G5,G4,G3,G2]   4
XYZ [G4, G3]    2

需要先按最后一列 desc 排序,如果 last col 相同,则按第一个元组项 desc 顺序排序。预期结果是

CDE [G5,G4,G3,G2]   4
FFF [G5, G4, G3]    3
ABC [G4, G3, G1]    3
XYZ [G4, G3]    2

提前致谢。

【问题讨论】:

    标签: scala hadoop apache-spark rdd


    【解决方案1】:

    你可以使用sortBy:

    rdd.sortBy(r => (r._3, r._2(0)), false)
    

    在上面,r._3 代表最后一列,r._2(0) 代表第二列的第一个元素(这是一个数组),false 指定顺序应该是降序。请记住,由于洗牌,排序是一项昂贵的操作。

    更新

    如果我们假设您以 pair rdd 开头,这是一个可重现的示例:

    /// Generate data
    val rdd = sc.parallelize(Seq(("ABC","G4"),("ABC","G3"),
                                 ("ABC","G1"),("FFF","G5"),
                                 ("FFF","G4"),("FFF","G3"),
                                 ("CDE","G5"),("CDE","G4"),                             
                                 ("CDE","G3"),("CDE","G2"),
                                 ("XYZ","G4"),("XYZ","G3")))
    
    /// Put values in a list and calculate its size
    val rdd_new = rdd.groupByKey.mapValues(_.toList).map(x => (x._1, x._2, x._2.size))
    
    /// Now this works
    rdd_new.sortBy(r => (r._3, r._2(0)), false).collect()
    /// Array[(String, List[String], Int)] = Array((CDE,List(G5, G4, G3, G2),4), (FFF,List(G5, G4, G3),3), (ABC,List(G4, G3, G1),3), (XYZ,List(G4, G3),2))
    

    【讨论】:

    • Mtoto,我试过了,但结果看起来并不完全符合预期。 (CDE ,[ G5, G4, G3, G2],4) (ABC ,[ G4, G3, G1],3) (FFF ,[ G5, G4, G3],3) (XYZ ,[ G4, G2], 2)。它按最后一列 desc 正确排序,但按数组中的第一项不正确。
    • 您好 Phoenix/Mtoto,感谢您的帮助。因为我对 Spark 真的很陌生。我想我确实正确解释了这是另一个过程的结果。我为这个问题打开了另一个流。请您通过以下链接帮助我:stackoverflow.com/questions/41681804/…。谢谢你的帮助。
    • 您需要做的是分享一个可重现的数据集示例,您链接的新问题与这个问题基本相同。问题可能是您的第二列是一个长字符串,您需要先将其转换为数组,然后上面的应该可以工作。
    • rdd.sortBy(r => (r._3, r._1), false) - 试试这个
    【解决方案2】:

    我不确定为什么上述答案不起作用。我觉得很好。试试这段代码吧。

    这是我的输入:

    i1,array1,10
    i5,array2,50
    i4,array3,20
    i2,array4,20
    

    代码:

    val idRDD = sc.textFile(inputPath)
    
    val idSOrted = idRDD.map { rec => ((rec.split(",")(2),rec.split(",")(0)),(rec.split(",")(1))) }.sortByKey(false).map(rec=>(rec._1._1,rec._2,rec._1._2))
    

    这里是 o/p:

    (50,array2,i5)
    (20,array3,i4)
    (20,array4,i2)
    (10,array1,i1)
    

    【讨论】:

      猜你喜欢
      • 2023-03-13
      • 1970-01-01
      • 2017-02-28
      • 1970-01-01
      • 1970-01-01
      • 2017-01-26
      • 1970-01-01
      • 1970-01-01
      • 2014-08-30
      相关资源
      最近更新 更多