【问题标题】:Scala RDD count by rangeScala RDD 按范围计数
【发布时间】:2018-06-03 22:31:03
【问题描述】:

我需要“提取”一些包含在 Iterable[MyObject] 中的数据(在 groupBy 之前是 RDD[MyObject])。

我的初始 RDD[MyObject] :

|-----------|---------|----------|
| startCity | endCity | Customer |
|-----------|---------|----------|
| Paris     | London  | ID | Age |
|           |         |----|-----|
|           |         |  1 | 1   |
|           |         |----|-----|
|           |         |  2 | 1   |
|           |         |----|-----|
|           |         |  3 | 50  |
|-----------|---------|----------|
| Paris     | London  | ID | Age |
|           |         |----|-----|
|           |         |  5 | 40  |
|           |         |----|-----|
|           |         |  6 | 41  |
|           |         |----|-----|
|           |         |  7 | 2   |
|-----------|---------|----|-----|
| New-York  | Paris   | ID | Age |
|           |         |----|-----|
|           |         |  9 | 15  |
|           |         |----|-----|
|           |         |  10| 16  |
|           |         |----|-----|
|           |         |  11| 46  |
|-----------|---------|----|-----|
| New-York  | Paris   | ID | Age |
|           |         |----|-----|
|           |         |  13| 7   |
|           |         |----|-----|
|           |         |  14| 9   |
|           |         |----|-----|
|           |         |  15| 60  |
|-----------|---------|----|-----|
| Barcelona | London  | ID | Age |
|           |         |----|-----|
|           |         |  17| 66  |
|           |         |----|-----|
|           |         |  18| 53  |
|           |         |----|-----|
|           |         |  19| 11  |
|-----------|---------|----|-----|

我需要按年龄范围和 groupBy startCity - endCity 计算它们

最终结果应该是:

|-----------|---------|-------------|
| startCity | endCity | Customer    |
|-----------|---------|-------------|
| Paris     | London  | Range| Count|
|           |         |------|------|
|           |         |0-2   | 3    |
|           |         |------|------|
|           |         |3-18  | 0    |
|           |         |------|------|
|           |         |19-99 | 3    |
|-----------|---------|-------------|
| New-York  | Paris   | Range| Count|
|           |         |------|------|
|           |         |0-2   | 0    |
|           |         |------|------|
|           |         |3-18  | 3    |
|           |         |------|------|
|           |         |19-99 | 2    |
|-----------|---------|-------------|
| Barcelona | London  | Range| Count|
|           |         |------|------|
|           |         |0-2   | 0    |
|           |         |------|------|
|           |         |3-18  | 1    |
|           |         |------|------|
|           |         |19-99 | 2    |
|-----------|---------|-------------|

目前我正在通过计数 3 次相同的数据来执行此操作(第一次是 0-2 范围,然后是 10-20,然后是 21-99)。

喜欢:

Iterable[MyObject] ite

ite.count(x => x.age match {
    case Some(age) => { age >= 0 && age < 2 }
}

它通过给我一个整数来工作,但我认为它根本没有效率,因为我必须计算很多次,请问最好的方法是什么?

谢谢

EDIT : Customer 对象是一个案例类

【问题讨论】:

  • Customer 是一个数组吗?
  • 客户是一个对象
  • 能否分享对象类型以便我们为您提供帮助。是案例类还是类似的东西?
  • 我编辑了帖子,Oli 回答了我的问题,谢谢 :)

标签: scala apache-spark rdd iterable


【解决方案1】:

假设您有Customer[Object] 作为case class 如下

case class Customer(ID: Int, Age: Int)

您的RDD[MyObject]rddcase class,如下所示

case class MyObject(startCity: String, endCity: String, customer: List[Customer])

所以使用上面的case classes 你应该有如下输入(你有表格格式)

MyObject(Paris,London,List(Customer(1,1), Customer(2,1), Customer(3,50)))
MyObject(Paris,London,List(Customer(5,40), Customer(6,41), Customer(7,2)))
MyObject(New-York,Paris,List(Customer(9,15), Customer(10,16), Customer(11,46)))
MyObject(New-York,Paris,List(Customer(13,7), Customer(14,9), Customer(15,60)))
MyObject(Barcelona,London,List(Customer(17,66), Customer(18,53), Customer(19,11)))

你还提到,分组后你有Iterable[MyObject],相当于下面的步骤

val groupedRDD = rdd.groupBy(myobject => (myobject.startCity, myobject.endCity))   //groupedRDD: org.apache.spark.rdd.RDD[((String, String), Iterable[MyObject])] = ShuffledRDD[2] at groupBy at worksheetTest.sc:23

所以下一步你要做的就是使用mapValues 遍历Iterable[MyObject],然后计算属于每个范围的ages,最后转换为你需要的输出,如下所示

val finalResult = groupedRDD.mapValues(x => {
  val rangeAge = Map("0-2" -> 0, "3-18" -> 0, "19-99" -> 0)
  val list = x.flatMap(y => y.customer.map(z => z.Age)).toList
  updateCounts(list, rangeAge).map(x => CustomerOut(x._1, x._2)).toList
})

其中updateCounts 是一个递归函数

def updateCounts(ageList: List[Int], map: Map[String, Int]) : Map[String, Int] = ageList match{
  case head :: tail => if(head >= 0 && head < 3) {
    updateCounts(tail, map ++ Map("0-2" -> (map("0-2")+1)))
  } else if(head >= 3 && head < 19) {
    updateCounts(tail, map ++ Map("3-18" -> (map("3-18")+1)))
  } else updateCounts(tail, map ++ Map("19-99" -> (map("19-99")+1)))
  case Nil => map
}

CustomerOut 是另一个case class

case class CustomerOut(Range: String, Count: Int)

所以finalResult 如下

((Barcelona,London),List(CustomerOut(0-2,0), CustomerOut(3-18,1), CustomerOut(19-99,2)))
((New-York,Paris),List(CustomerOut(0-2,0), CustomerOut(3-18,4), CustomerOut(19-99,2)))
((Paris,London),List(CustomerOut(0-2,3), CustomerOut(3-18,0), CustomerOut(19-99,3)))

【讨论】:

    【解决方案2】:
    def computeRange(age : Int) = 
        if(age<=2)
            "0-2"
        else if(age<=10)
            "2-10"
        // etc, you get the idea
    

    然后,RDD 为case class MyObject(id : String, age : Int)

    rdd
       .map(x=> computeRange(x.age) -> 1)
       .reduceByKey(_+_)
    

    编辑: 如果您需要按某些列进行分组,您可以这样做,前提是您有一个 RDD[(SomeColumns, Iterable[MyObject])]。以下几行将为您提供一个将每个“范围”与其出现次数相关联的地图。

    def computeMapOfOccurances(list : Iterable[MyObject]) : Map[String, Int] =
        list
            .map(_.age)
            .map(computeRange)
            .groupBy(x=>x)
            .mapValues(_.size)
    
    val result1 = rdd
        .mapValues( computeMapOfOccurances(_))
    

    如果你需要扁平化你的数据,你可以这样写:

    val result2 = result1
        .flatMapValues(_.toSeq)    
    

    【讨论】:

    • 谢谢,它与 RDD[MyObject] 的作用就像一个魅力,但我有一个 Iterable[MyObject],因为我按我的初始 RDD[MyObject] 分组,而使用 Iterable 则不能使用 reduceByKey。我最初的问题现在已编辑,因为我不清楚,抱歉
    猜你喜欢
    • 1970-01-01
    • 2017-05-28
    • 2015-02-19
    • 1970-01-01
    • 2016-05-06
    • 2023-03-15
    • 2017-12-03
    • 2018-05-25
    • 1970-01-01
    相关资源
    最近更新 更多