【问题标题】:Join two ordinary RDDs with/without Spark SQL加入两个带有/不带 Spark SQL 的普通 RDD
【发布时间】:2016-08-22 20:08:07
【问题描述】:

我需要在一个/多个列上加入两个普通的RDDs。逻辑上这个操作相当于两个表的数据库连接操作。我想知道这是否只能通过Spark SQL 才能实现,或者还有其他方法。

作为一个具体的例子,考虑 RDD r1 主键 ITEM_ID:

(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID)

和带有主键COMPANY_ID的RDD r2

(COMPANY_ID, COMPANY_NAME, COMPANY_CITY)

我想加入r1r2

如何做到这一点?

【问题讨论】:

    标签: scala join apache-spark rdd apache-spark-sql


    【解决方案1】:

    Soumya Simanta 给出了一个很好的答案。但是joined RDD里面的值是Iterable,所以结果可能和普通的表join不太一样。

    或者,您可以:

    val mappedItems = items.map(item => (item.companyId, item))
    val mappedComp = companies.map(comp => (comp.companyId, comp))
    mappedItems.join(mappedComp).take(10).foreach(println)
    

    输出将是:

    (c1,(Item(1,first,2,c1),Company(c1,company-1,city-1)))
    (c1,(Item(2,second,2,c1),Company(c1,company-1,city-1)))
    (c2,(Item(3,third,2,c2),Company(c2,company-2,city-2)))
    

    【讨论】:

    • 您的地图与items.keyBy{_.companyId}companies.keyBy{_.companyId} 相同。既然它们是 Spark 的一部分,那么有可能会更有效率吗?
    • @Paul 这是 keyBy 的 spark 源代码:def keyBy[K](f: T => K): RDD[(K, T)] = { map(x => (f(x), x)) } 所以你的解决方案和@virya 解决方案完全一样
    • OK :) 尽管如此,keyBy 的意图可能会更清晰一些。不过,这不是重点
    【解决方案2】:

    (使用 Scala) 假设你有两个 RDD:

    • emp: (empid, ename, dept)

    • 部门:(dname,部门)

    以下是另一种方式:

    //val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))
    val emp = sc.parallelize(Seq(("jordan",10), ("ricky",20), ("matt",30), ("mince",35), ("rhonda",30)))
    
    val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))
    
    //val shifted_fields_emp = emp.map(t => (t._3, t._1, t._2))
    val shifted_fields_emp = emp.map(t => (t._2, t._1))
    
    val shifted_fields_dept = dept.map(t => (t._2,t._1))
    
    shifted_fields_emp.join(shifted_fields_dept)
    // Create emp RDD
    val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))
    
    // Create dept RDD
    val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))
    
    // Establishing that the third field is to be considered as the Key for the emp RDD
    val manipulated_emp = emp.keyBy(t => t._3)
    
    // Establishing that the second field need to be considered as the Key for dept RDD
    val manipulated_dept = dept.keyBy(t => t._2)
    
    // Inner Join
    val join_data = manipulated_emp.join(manipulated_dept)
    // Left Outer Join
    val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept)
    // Right Outer Join
    val right_outer_join_data = manipulated_emp.rightOuterJoin(manipulated_dept)
    // Full Outer Join
    val full_outer_join_data = manipulated_emp.fullOuterJoin(manipulated_dept)
    
    // Formatting the Joined Data for better understandable (using map)
    val cleaned_joined_data = join_data.map(t => (t._2._1._1, t._2._1._2, t._1, t._2._2._1))
    

    这将给出如下输出:

    // 在控制台打印输出cleaned_joined_data

    scala> cleaned_joined_data.collect()
    res13: Array[(Int, String, Int, String)] = Array((3,matt,30,hive), (5,rhonda,30,hive), (2,ricky,20,spark), (1,jordan,10,hadoop))
    

    【讨论】:

      【解决方案3】:

      这样的事情应该可以工作。

      scala> case class Item(id:String, name:String, unit:Int, companyId:String)
      
      scala> case class Company(companyId:String, name:String, city:String)
      
      scala> val i1 = Item("1", "first", 2, "c1")
      
      scala> val i2 = i1.copy(id="2", name="second")
      
      scala> val i3 = i1.copy(id="3", name="third", companyId="c2")
      
      scala> val items = sc.parallelize(List(i1,i2,i3))
      items: org.apache.spark.rdd.RDD[Item] = ParallelCollectionRDD[14] at parallelize at <console>:20
      
      scala> val c1 = Company("c1", "company-1", "city-1")
      
      scala> val c2 = Company("c2", "company-2", "city-2")
      
      scala> val companies = sc.parallelize(List(c1,c2))
      
      scala> val groupedItems = items.groupBy( x => x.companyId) 
      groupedItems: org.apache.spark.rdd.RDD[(String, Iterable[Item])] = ShuffledRDD[16] at groupBy at <console>:22
      
      scala> val groupedComp = companies.groupBy(x => x.companyId)
      groupedComp: org.apache.spark.rdd.RDD[(String, Iterable[Company])] = ShuffledRDD[18] at groupBy at <console>:20
      
      scala> groupedItems.join(groupedComp).take(10).foreach(println)
      
      14/12/12 00:52:32 INFO DAGScheduler: Job 5 finished: take at <console>:35, took 0.021870 s
      (c1,(CompactBuffer(Item(1,first,2,c1), Item(2,second,2,c1)),CompactBuffer(Company(c1,company-1,city-1))))
      (c2,(CompactBuffer(Item(3,third,2,c2)),CompactBuffer(Company(c2,company-2,city-2))))
      

      【讨论】:

        【解决方案4】:

        Spark SQL 可以在 SPARK RDD 上执行连接。

        以下代码对 Company 和 Items RDD 执行 SQL 连接

        object SparkSQLJoin {
        
        case class Item(id:String, name:String, unit:Int, companyId:String)
        case class Company(companyId:String, name:String, city:String)
        
        def main(args: Array[String]) {
        
            val sparkConf = new SparkConf()
            val sc= new SparkContext(sparkConf)
            val sqlContext = new SQLContext(sc)
        
            import sqlContext.createSchemaRDD
        
            val i1 = Item("1", "first", 1, "c1")
            val i2 = Item("2", "second", 2, "c2")
            val i3 = Item("3", "third", 3, "c3")
            val c1 = Company("c1", "company-1", "city-1")
            val c2 = Company("c2", "company-2", "city-2")
        
            val companies = sc.parallelize(List(c1,c2))
            companies.registerAsTable("companies")
        
            val items = sc.parallelize(List(i1,i2,i3))
            items.registerAsTable("items")
        
            val result = sqlContext.sql("SELECT * FROM companies C JOIN items I ON C.companyId= I.companyId").collect
        
            result.foreach(println)
        
            }
        }
        

        输出显示为

             [c1,company-1,city-1,1,first,1,c1]
             [c2,company-2,city-2,2,second,2,c2]
        

        【讨论】:

        • 我有几列,因此我需要以编程方式指定架构。此外,RDD 是从 HDFS 上的大型文本文件创建的。我相信上述方法仍然有效,对吧?如果需要任何更改,请告诉我。
        • 是的,这种方法也适用于大数据。要以编程方式定义架构,请查看 spark.apache.org/docs/latest/…
        猜你喜欢
        • 2016-09-07
        • 2016-01-24
        • 2019-01-01
        • 2015-11-20
        • 2010-12-07
        • 2017-06-27
        • 2018-11-24
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多