协同过滤常用于推荐系统,这项技术旨在填补 丢失的user-item关联矩阵 的条目,spark.ml目前支持基于模型的协同过滤(用一些丢失条目的潜在因素在描述用户和产品)。spark.ml使用ALS(交替最小二乘法)去学习这些潜在因素。在spark.ml中的实现有以下参数:
numBlocks:块的数量,user和item将被分成多少块,以并行计算。(默认10)
ranK:模型隐含因素的个数。(默认10)
maxIter:模型的最大迭代次数。(默认10)
regParam :ALS的正则化参数。(默认1.0)
implicitPrefs :使用显式反馈还是隐式反馈。(默认false,即显式反馈)
alpha:信心权重所应达到的基准线。(默认1.0)
nonnegative :是否使用非负数的约束。(默认false)
注意:基于dataFrame - API的ALS目前只支持整数型的userID和itemID,其他数字类型也支持,但是取值范围必须在整数之内。
显式反馈VS隐式反馈
标准的基于矩阵分解的协同过滤方法对待user-item矩阵的条目项 是显式地给出user对item的偏好,例如,用户给电影评级。
而现实生活中常见案例是只能有隐式反馈(例如:视图,点击鼠标,购买,喜欢,分享……)。在spark.ml中使用的方法是:对隐式反馈数据集的协同过滤。实际上,这种方法不是直接对数据矩阵进行建模,而是将数据视为代表用户行为意愿强度的数字(例如点击的次数或某人累积观看电影的时间)。然后,这些数字与观察到的用户偏好的置信水平相关,而不是给予项目的明确评级。 然后,该模型尝试找到可用于预测用户对项目的预期偏好的潜在因素。
Scaling of the regularization parameter(正则化参数的换算)
我们通过用户在更新用户因素中产生的评分,或产品在更新产品因素中收到的评分来求解每个最小二乘问题的规则化参数 regParam 。 这种方法被命名为“ ALS-WR ”(加权正则化交替最小二乘法),并在论文“ Large-Scale Parallel Collaborative Filtering for the Netflix Prize ”中进行了讨论。 它使 regParam 对数据集的规模依赖较少,因此我们可以将从采样子集学到的最佳参数应用于完整数据集,并期望能有相似的表现。
冷启动策略
在进行预测时使用ALSModel,通常遇到这样一种情况,训练模型的时候,user、item在测试数据集没有出现。这种情况通常发生在两个场景:
- 在生产中,新user或item,没有评级的历史,或者模型没有训练(这是“冷启动问题”)。
- 在交叉验证中,数据分为训练集和评价集。当简单的随机生成
CrossValidator或 TrainValidationSplit,它实际上是非常常见的出现这样情况:user、item在验证集,但不在训练集。
默认情况下,当user、item不在模型之中,spark在ALSModel.transform分配NaN预测,这在预测系统中非常的有用,因为它表明了一个新的user、item,系统可以做一个后备性的预测。
然而在交叉验证中这是不可取的,因为任何一个NaN预测会导致评估度量的NaN结果(例如当使用RegressionEvaluator时)这使得模型选择是不可能的。
spark允许用户设置coldStartStrategy参数为“drop”,以便删除包含NaN值的预测的DataFrame中的任何行。 然后,将根据非NaN数据计算评估度量,并将有效。 以下示例说明了此参数的用法。
注意:目前支持的冷启动策略是“nan”(上面提到的默认行为)和“drop”。 未来可能会支持进一步的策略。
例子
在以下示例中,我们从MovieLens数据集加载评级数据,每行由用户,电影,评级和时间戳组成。 然后,假设我们训练一个ALS模型,默认情况下假定评级是明确的(implicitPrefs是false)。 我们通过测量评级预测的均方根误差来评估推荐模型。
有关API的更多详细信息,请参阅ALS Java文档。
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
public static class Rating implements Serializable {
private int userId;
private int movieId;
private float rating;
private long timestamp;
public Rating() {}
public Rating(int userId, int movieId, float rating, long timestamp) {
this.userId = userId;
this.movieId = movieId;
this.rating = rating;
this.timestamp = timestamp;
}
public int getUserId() {
return userId;
}
public int getMovieId() {
return movieId;
}
public float getRating() {
return rating;
}
public long getTimestamp() {
return timestamp;
}
public static Rating parseRating(String str) {
String[] fields = str.split("::");
if (fields.length != 4) {
throw new IllegalArgumentException("Each line must contain 4 fields");
}
int userId = Integer.parseInt(fields[0]);
int movieId = Integer.parseInt(fields[1]);
float rating = Float.parseFloat(fields[2]);
long timestamp = Long.parseLong(fields[3]);
return new Rating(userId, movieId, rating, timestamp);
}
}
JavaRDD<Rating> ratingsRDD = spark
.read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
.map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];
// Build the recommendation model using ALS on the training data
ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating");
ALSModel model = als.fit(training);
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);
RegressionEvaluator evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction");
Double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);
// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);
在Spark repo中的“examples / src / main / java / org / apache / spark / examples / ml / JavaALSExample.java”中查找完整示例代码。
如果评级矩阵是从另一个信息来源导出的(即从其他信号推断出来),您可以将implicitPrefs设置为true以获得更好的结果:
ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating");