【问题标题】:scala.MatchError: null on spark RDDsscala.MatchError:火花 RDD 上为空
【发布时间】:2014-08-30 09:50:50
【问题描述】:

我对 spark 和 scala 都比较陌生。 我试图在 spark 上使用 scala 实现协同过滤。 下面是代码

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

val data = sc.textFile("/user/amohammed/CB/input-cb.txt")

val distinctUsers = data.map(x => x.split(",")(0)).distinct().map(x => x.toInt)

val distinctKeywords = data.map(x => x.split(",")(1)).distinct().map(x => x.toInt)

val ratings = data.map(_.split(',') match {
  case Array(user, item, rate) => Rating(user.toInt,item.toInt, rate.toDouble)
})

val model = ALS.train(ratings, 1, 20, 0.01)

val keywords = distinctKeywords collect
  distinctUsers.map(x => {(x, keywords.map(y => model.predict(x,y)))}).collect()

它抛出一个 scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571) 在最后一行 如果我将 distinctUsers rdd 收集到一个数组中并执行相同的代码,则 Thw 代码可以正常工作:

val users = distinctUsers collect
  users.map(x => {(x, keywords.map(y => model.predict(x, y)))})

我在处理 RDD 时哪里出错了?

Spark 版本:1.0.0 斯卡拉版本:2.10.4

【问题讨论】:

  • 我不确定,但请尝试将keywords 放在最后一行之前的broadcast variable 中。 (我永远不确定什么时候会自动捕获某些内容以及何时需要广播......)
  • 这是个好主意。我从来没有想过。感谢您的想法,但错误仍然存​​在。我也试过广播变量model
  • 你的 spark 版本号是多少?
  • 抱歉,我没有在问题中提及。火花:1.0.0。斯卡拉:2.10.4
  • 请注意:通常x => {(x, keywords.map(y => model.predict(x,y)))} 会写得更像{ x => (x, keywords.map { y => model.predict(x, y) }) }{} 用于定义一个块,该块将其最后一条语句作为其值返回。您可能已经注意到,_ 可用于按位置匹配参数,如 List(1, 2, 3).map(_ * 2).reduce(_ + _),但为了清楚起见或使用两次参数,您应该给它一个名称。

标签: scala apache-spark rdd apache-spark-mllib collaborative-filtering


【解决方案1】:

在堆栈跟踪中进一步调用一次,MatrixFactorizationModel 源代码的第 43 行说:

val userVector = new DoubleMatrix(userFeatures.lookup(user).head)

注意modeluserFeatures字段本身就是另一个RDD;我相信当匿名功能块在model 上关闭时它没有被正确序列化,因此它上面的lookup 方法失败了。我还尝试将modelkeywords 都放入广播变量中,但这也不起作用。

与其回退到 Scala 集合并失去 Spark 的优势,不如坚持使用 RDD 并利用其他转换它们的方法。

我会从这个开始:

val ratings = data.map(_.split(',') match {
  case Array(user, keyword, rate) => Rating(user.toInt, keyword.toInt, rate.toDouble)
})

// instead of parsing the original RDD's strings three separate times,
// you can map the "user" and "product" fields of the Rating case class

val distinctUsers = ratings.map(_.user).distinct()
val distinctKeywords = ratings.map(_.product).distinct()

val model = ALS.train(ratings, 1, 20, 0.01)

然后,我们不需要逐个计算每个预测,而是将所有可能的用户-关键字对的笛卡尔积作为 RDD,并使用 MatrixFactorizationModel 中的另一个 predict 方法,该方法将此类对的 RDD 作为其论据。

val userKeywords = distinctUsers.cartesian(distinctKeywords)

val predictions = model.predict(userKeywords).map { case Rating(user, keyword, rate) =>
  (user, Map(keyword -> rate))
}.reduceByKey { _ ++ _ }

现在predictions 为每个用户提供了一个不可变的地图,可以查询特定关键字的预测评级。如果您特别想要原始示例中的数组,您可以这样做:

val keywords = distinctKeywords.collect() // add .sorted if you want them in order
val predictionArrays = predictions.mapValues(keywords.map(_))

警告:我使用 Spark 1.0.1 进行了测试,因为它是我安装的,但它也应该适用于 1.0.0。

【讨论】:

  • 谢谢,这正是我想要的。我想知道一些学习 Spark 和 Scala 的链接或书籍。
  • Spark 文档一直在改进——我之前没有使用过 MLlib 的协同过滤,但是您可能已经看到的 example 帮助我使用它。对于一般处理 Scala,您可以尝试 Twitter 的 Scala School,它提供了该语言许多有用特性的快速概览。
猜你喜欢
  • 2015-07-10
  • 1970-01-01
  • 2017-02-28
  • 1970-01-01
  • 1970-01-01
  • 2018-04-26
  • 1970-01-01
  • 1970-01-01
  • 2021-01-20
相关资源
最近更新 更多