【问题标题】:Merging RDDs using Scala Apache Spark使用 Scala Apache Spark 合并 RDD
【发布时间】:2015-07-05 00:05:39
【问题描述】:

我有 2 个 RDD。

RDD1: ((String, String), Int)
RDD2: (String, Int)

例如:

    RDD1

    ((A, X), 1)
    ((B, X), 2)
    ((A, Y), 2)
    ((C, Y), 3)

    RDD2

    (A, 6)
    (B, 7)
    (C, 8)

Output Expected

    ((A, X), 6)
    ((B, X), 14)
    ((A, Y), 12)
    ((C, Y), 24)

在 RDD1 中,(String, String) 组合是唯一的,而在 RDD2 中,每个字符串键都是唯一的。 RDD2 (6) 中 A 的得分乘以 RDD1 中所有 key 为 A 的条目的得分值。

14 = 7 * 2
12 = 6 * 2
24 = 8 * 3

我写了以下内容,但在案例中出现错误:

val finalRdd = countRdd.join(countfileRdd).map(case (k, (ls, rs)) => (k, (ls * rs)))

有人可以帮我解决这个问题吗?

【问题讨论】:

  • 我猜(A, X),6 应该是(A, X),12 按照你的逻辑,还是我错过了什么?
  • 它是 6,因为 RDD2 中 A 的值为 6,而在 RDD1 中,(A,X) 的值为 1。所以它们相乘得到 6。

标签: java scala apache-spark


【解决方案1】:

您的第一个 RDD 与第二个 RDD 的键类型不同(元组 (A, X) 与 A)。您应该在加入之前对其进行转换:

val rdd1  = sc.parallelize(List((("A", "X"), 1), (("A", "Y"), 2)))
val rdd2 = sc.parallelize(List(("A", 6)))
val rdd1Transformed = rdd1.map { 
   case ((letter, coord), value) => (letter, (coord, value)) 
}
val result = rdd1Transformed
  .join(rdd2)
  .map { 
    case (letter, ((coord, v1), v2)) => ((letter, coord), v1 * v2) 
  }
result.collect()
res1: Array[((String, String), Int)] = Array(((A,X),6), ((A,Y),12))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-10-18
    • 2023-03-18
    • 2015-10-31
    • 1970-01-01
    • 2016-12-16
    • 1970-01-01
    • 2014-05-13
    相关资源
    最近更新 更多