【发布时间】:2016-06-24 00:20:59
【问题描述】:
如何按 list.contains() 过滤? 这是我当前的代码,我有一个 Main 类,它从命令行参数获取输入,并根据该输入执行相应的调度程序。在这种情况下,它是一个 RecommendationDispatcher 类,它在构造函数中发挥了所有作用 - 训练模型并为输入的各种用户生成推荐:
import org.apache.commons.lang.StringUtils.indexOfAny
import java.io.{BufferedWriter, File, FileWriter}
import java.text.DecimalFormat
import Util.javaHash
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.{SparkConf, SparkContext}
class RecommendDispatcher(master:String, inputFile:String, outputFile:String, userList: List[String]) extends java.io.Serializable {
val format : DecimalFormat = new DecimalFormat("0.#####");
val file = new File(outputFile)
val bw = new BufferedWriter(new FileWriter(file))
val conf = new SparkConf().setAppName("Movies").setMaster(master)
val sparkContext = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val baseRdd = sparkContext.textFile(inputFile)
val movieIds = baseRdd.map(line => line.split("\\s+")(1)).distinct().map(id => (javaHash(id), id))
val userIds = baseRdd.map(line => line.split("\\s+")(3)).distinct()
.filter(x => userList.contains(x))
.map(id => (javaHash(id), id))
val ratings = baseRdd.map(line => line.split("\\s+"))
.map(tokens => (tokens(3),tokens(1), tokens(tokens.indexOf("review/score:")+1).toDouble))
.map( x => Rating(javaHash(x._1),javaHash(x._2),x._3))
// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)
val users = userIds.collect()
var mids = movieIds.collect()
usrs.foreach(u => {
bw.write("Recommendations for " + u + ":\n")
var ranked = List[(Double, Int)]()
mids.foreach(x => {
val movieId = x._1
val prediction = (model.predict(u._1, movieId), movieId)
ranked = ranked :+ prediction
})
//Sort in descending order
ranked = ranked.sortBy(x => -1 * x._1)
ranked.foreach(x => bw.write(x._1 + " ; " + x._2 + "\n"))
})
bw.close()
}
这个异常被抛出在“.filter”行:
线程“主”org.apache.spark.SparkException 中的异常:任务不是 可序列化
【问题讨论】:
-
userList的类型是什么? -
@ethan 任务不可序列化异常是关闭“泄漏”的结果。调试这些类型的错误的最佳方法是使用完整的代码上下文。我建议您将整个代码放在 Databricks 社区版笔记本中,然后在此处共享链接。你可以注册accounts.cloud.databricks.com/registration.html#signup/…
标签: scala apache-spark