【问题标题】:Join two RDD in spark在 spark 中加入两个 RDD
【发布时间】:2016-01-24 03:11:38
【问题描述】:

我有两个 rdd,一个 rdd 只有一列,另外有两列连接键上的两个 RDD 我添加了 0 的虚拟值,有没有其他有效的方法来使用 join ?

val lines = sc.textFile("ml-100k/u.data")
val movienamesfile = sc.textFile("Cml-100k/u.item")

val moviesid = lines.map(x => x.split("\t")).map(x => (x(1),0))
val test = moviesid.map(x => x._1)
val movienames = movienamesfile.map(x => x.split("\\|")).map(x => (x(0),x(1)))
val shit = movienames.join(moviesid).distinct()

编辑

让我把这个问题转换成 SQL。比如说我有table1 (moveid)table2 (movieid,moviename)。在 SQL 中,我们编写如下内容:

select moviename, movieid, count(1)
from table2 inner join table table1 on table1.movieid=table2.moveid 
group by ....

在 SQL 中,table1 只有一列,而 table2 有两列,join 仍然有效,在 Spark 中同样的方式可以连接来自两个 RDD 的键。

【问题讨论】:

  • 你的问题不是很清楚。你能重新制定吗?
  • 您是否正在尝试实现过滤,例如内部连接?
  • 是和内连接 dataset1=123,starwars 一样;数据集2=123; dataset1.join(datset2) 失败,因为 dataset2 缺少一个元素,所以我需要在 dataset2=123,0 中添加默认值;那么如果 dataset2 包含的元素数量较少,连接是否有效?
  • 你能给出你声明的值的类型吗?
  • 我认为this post 有您正在寻找的答案。

标签: scala apache-spark


【解决方案1】:

Join 操作仅在PairwiseRDDs 上定义,这与 SQL 中的关系/表完全不同。 PairwiseRDD 的每个元素都是Tuple2,其中第一个元素是key,第二个元素是value。两者都可以包含复杂对象,只要key 提供有意义的hashCode

如果您想在 SQL 中考虑这一点,您可以考虑 key,因为 ON 子句和 value 的所有内容都包含选定的列。

SELECT table1.value, table2.value
FROM table1 JOIN table2 ON table1.key = table2.key

虽然这些方法乍一看很相似,但您可以使用另一种方法来表达一种方法,但它们有一个根本区别。当您查看 SQL 表并忽略约束时,所有列都属于同一类对象,而 PairwiseRDD 中的 keyvalue 具有明确的含义。

回到您的问题以使用join,您需要keyvalue。可以说比使用0 作为占位符更干净的是使用null 单例,但实际上没有办法绕过它。

对于小数据,您可以使用过滤器以类似的方式广播加入:

val moviesidBD = sc.broadcast(
  lines.map(x => x.split("\t")).map(_.head).collect.toSet)

movienames.filter{case (id, _) => moviesidBD.value contains id}

但如果你真的想要 SQL 式连接,那么你应该简单地使用 SparkSQL。

val movieIdsDf = lines
   .map(x => x.split("\t"))
   .map(a => Tuple1(a.head))
   .toDF("id")

val movienamesDf = movienames.toDF("id", "name")

// Add optional join type qualifier 
movienamesDf.join(movieIdsDf, movieIdsDf("id") <=> movienamesDf("id"))

【讨论】:

    【解决方案2】:

    在RDD上Join操作只针对PairwiseRDDs定义,所以需要将值改为pairedRDD。下面是一个示例

      val rdd1=sc.textFile("/data-001/part/")
      val rdd_1=rdd1.map(x=>x.split('|')).map(x=>(x(0),x(1)))
      val rdd2=sc.textFile("/data-001/partsupp/")
      val rdd_2=rdd2.map(x=>x.split('|')).map(x=>(x(0),x(1)))
    
      rdd_1.join(rdd_2).take(2).foreach(println)
    

    【讨论】:

      猜你喜欢
      • 2016-09-07
      • 2017-06-27
      • 2018-11-24
      • 2019-01-01
      • 2017-11-15
      • 1970-01-01
      • 1970-01-01
      • 2015-10-18
      • 2015-06-15
      相关资源
      最近更新 更多