【问题标题】:how to merge two RDD to one RDD [duplicate]如何将两个RDD合并到一个RDD [重复]
【发布时间】:2016-12-13 11:45:21
【问题描述】:

求助,我有两个 RDD,我想合并到一个 RDD。这是我的代码。

val us1 = sc.parallelize(Array(("3L"), ("7L"),("5L"),("2L")))
val us2 = sc.parallelize(Array(("432L"), ("7123L"),("513L"),("1312L")))

【问题讨论】:

  • 您的预期输出是什么,您尝试过什么?
  • 3L 7L 5L 2L 432L 7123L 513L 1312L
  • 我想要这个RDD,意思是两个RDD合并到一个RDD
  • val newrdd = us1.++(us2)

标签: scala apache-spark rdd


【解决方案1】:

只需使用联合:

val merged = us1.union(us2)

文档是here

Scala 中的快捷键是:

val merged = us1 ++ us2
【解决方案2】:

您需要RDD.union这些不加入密钥。 Union 本身并没有真正做任何事情,所以它的开销很低。请注意,合并后的 RDD 将拥有原始 RDD 的所有分区,因此您可能希望在联合后合并。

val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
val z = x.union(y)
z.collect
res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))

API

def++(other: RDD[T]): RDD[T]

返回这个RDD和另一个RDD的联合。

def++ API

def union(other: RDD[T]): RDD[T]

返回这个 RDD 和另一个 RDD 的并集。任何相同的元素都会出现多次(使用 .distinct() 消除它们)。

def union API

【讨论】:

  • 为什么要在之后合并?如果两个输入 RDD 被正确分区,那么 union RDD 也将是。
  • 只是为了性能和更新分区。它不是强制性的,但可以做到。它返回一个新的 RDD,该 RDD 被缩减为 numPartitions 个分区。
  • 好吧,我明白了 coalesce 的作用。但是,如果您的分区在两个输入 RDD 中的大小都正确,则执行合并将产生太大的分区(特别是如果您使用 shuffle = false 选项)
  • 然后,如果它的分区正确完成,那么一切都很好。你的代码很好:)
猜你喜欢
  • 2017-08-05
  • 1970-01-01
  • 2016-01-03
  • 2015-03-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多