【发布时间】:2014-11-11 00:35:23
【问题描述】:
其实我有两个结构相同的RDD [(String, (Int, scala.collection.immutable.Map[String,Int], Double))]
rdd1
(A,(1,Map(VVV -> 1),0.0))
(B,(26,Map(DDD -> 2, PPP -> 7, OOO -> 2, EEE -> 3, LLL -> 12),1.35))
(C,(2,Map(VVV -> 2),0.0))
rdd2
(OOO,(2,Map(B -> 2),0.0))
(DDD,(2,Map(B -> 2),0.0))
(PPP,(7,Map(B -> 7),0.0))
(LLL,(12,Map(B -> 12),0.0))
(VVV,(3,Map(C -> 2, A -> 1),0.63))
(EEE,(3,Map(B -> 3),0.0))
我需要迭代 rdd1 和每个映射键 ((VVV), (DDD, PPP, OOO, EEE, LLL), (VVV)) 以在 rdd2 中搜索其键,然后调用一个函数来执行计算。
这样做的方法是什么?那可能吗?迭代一个RDD,并根据一个值在另一个RDD中按键搜索。
我测试过:
def calculate(t: String, c: Int, m: scala.collection.immutable.Map[String,Int], e: Double, r: org.apache.spark.rdd.RDD[(String, (Int, scala.collection.immutable.Map[String,Int], Double))]) = {
Tuple5(t,c,m,e,r.lookup("DDD"))
}
val newRDD = rdd1.map(f => calculate(f._1, f._2._1, f._2._2, f._2._3, rdd2))
当我执行newRDD.take(10).foreach(println(_))
它给了我以下错误信息:
14/11/10 13:30:46 ERROR Executor: Exception in task ID 54 scala.MatchError: null
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:572)
另一个测试是:
rdd1.foreach(a => { rdd2.foreach(b => { println(b)}) })
但它给了我以下错误信息:
14/11/10 13:35:23 ERROR Executor: Exception in task ID 55 java.lang.NullPointerException
at org.apache.spark.rdd.RDD.foreach(RDD.scala:715)
【问题讨论】:
-
它不起作用的原因是因为 RDD 不能在像这样的闭包中使用
rdd1.foreach(a => { rdd2.foreach(b => { println(b)}) })您需要根据 RDD 上的操作来表达您的计算,而不是“使用” RDD。将 RDD 视为指向分布式集合的“指针”,而不是集合本身。提供的答案显示了解决此问题的方法,我只是想指出根本原因。
标签: scala apache-spark