【问题标题】:Union of elements of two RDDs两个 RDD 元素的联合
【发布时间】:2015-12-16 16:51:57
【问题描述】:

我想在 scala 中合并两个 RDD。我无法将它们中的任何一个存储在内存中,因为它们非常大。

A = {k1->List(A,B,C), k2->List(W,E,Q)}
B = {k1->List(D,E,F), k2->List(E,U,O)}

我怎样才能将A和B联合起来得到

{(A,B,C,D,E,F),(W,E,Q,U,O)}

谢谢, 南希

【问题讨论】:

  • 你能写出真正的代码吗?因为.union 完全符合您的要求,我担心您的 RDD 与您描述的不同。
  • 我有两个像这样的RDD: List(String)> 我已经按键排序了。我想做的是两个 RDD 值的列并集。 val a3 = a2.map {case (k,v) => k->v.map{case (ki,vi) => vi}}.sortByKey(true) val a31 = a21.map {case (k,v ) => k->v.map{case (ki,vi) => ki}}.sortByKey(true) a3.union(a31)
  • 请更新您的问题。

标签: scala apache-spark union rdd


【解决方案1】:

据我所知,您只需要一个join

val a = sc.parallelize(Seq(
  ("k1" -> List("A", "B" , "C")), ("k2" -> List("W", "E", "Q"))))
val b = sc.parallelize(Seq(
  ("k1" -> List("D", "E", "F")), ("k2" -> List("E", "U", "O"))))

val combined = a.join(b) // Join by key
  .values  // drop keys
  .map{case (x, y) =>  x ++ y} // Combine elements

【讨论】:

    【解决方案2】:

    注意:此答案与问题的第 4 版相符。从那以后,这个问题发生了变化。我没有删除答案,因为有一些关于使用zip的陷阱@


    你可以使用zip:

    val rdd1 = sc.parallelize(Seq("A", "B", "C"))
    val rdd2 = sc.parallelize(Seq("D", "E", "F"))
    
    val zipped = rdd1.zip(rdd2)
    

    导致

    scala>zipped.collect().foreach(println)
    (A,D)
    (B,E)
    (C,F)
    

    【讨论】:

    • 我看起来不像 OP 想要的东西。更不用说 Spark 中的 zip 相当棘手。
    • @zero323 它匹配问题的第 4 版 - 从那时起问题发生了变化。 zip 在哪些情况下比较棘手?
    • zip 的问题是它需要相同数量的分区(简单部分)和每个分区上相同数量的元素(困难部分)。它适用于来自同一血统的 RDD,但总的来说有点没用。
    • @zero323 谢谢你的sn-p。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-28
    • 1970-01-01
    相关资源
    最近更新 更多