【发布时间】:2014-08-30 09:50:50
【问题描述】:
我对 spark 和 scala 都比较陌生。 我试图在 spark 上使用 scala 实现协同过滤。 下面是代码
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
val data = sc.textFile("/user/amohammed/CB/input-cb.txt")
val distinctUsers = data.map(x => x.split(",")(0)).distinct().map(x => x.toInt)
val distinctKeywords = data.map(x => x.split(",")(1)).distinct().map(x => x.toInt)
val ratings = data.map(_.split(',') match {
case Array(user, item, rate) => Rating(user.toInt,item.toInt, rate.toDouble)
})
val model = ALS.train(ratings, 1, 20, 0.01)
val keywords = distinctKeywords collect
distinctUsers.map(x => {(x, keywords.map(y => model.predict(x,y)))}).collect()
它抛出一个 scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571) 在最后一行 如果我将 distinctUsers rdd 收集到一个数组中并执行相同的代码,则 Thw 代码可以正常工作:
val users = distinctUsers collect
users.map(x => {(x, keywords.map(y => model.predict(x, y)))})
我在处理 RDD 时哪里出错了?
Spark 版本:1.0.0 斯卡拉版本:2.10.4
【问题讨论】:
-
我不确定,但请尝试将
keywords放在最后一行之前的broadcast variable 中。 (我永远不确定什么时候会自动捕获某些内容以及何时需要广播......) -
这是个好主意。我从来没有想过。感谢您的想法,但错误仍然存在。我也试过广播变量
model。 -
你的 spark 版本号是多少?
-
抱歉,我没有在问题中提及。火花:1.0.0。斯卡拉:2.10.4
-
请注意:通常
x => {(x, keywords.map(y => model.predict(x,y)))}会写得更像{ x => (x, keywords.map { y => model.predict(x, y) }) }。{}用于定义一个块,该块将其最后一条语句作为其值返回。您可能已经注意到,_可用于按位置匹配参数,如List(1, 2, 3).map(_ * 2).reduce(_ + _),但为了清楚起见或使用两次参数,您应该给它一个名称。
标签: scala apache-spark rdd apache-spark-mllib collaborative-filtering