算法:
1. 选择k个簇中心,作为聚类中心 。
2. 计算每个样本点到聚类中心的距离,将每个样品点分配到最近的聚类中心,形成k个簇。
3. 计算每个簇的平均值,并将这个平均值作为新的聚类中心。
4. 反复执行2、3步骤,直到旧质心和新质心的差异小于阈值或迭代次数达到要求为止。
实例:
在IDEA运行,如果是spark-shell命令行窗口,
可使用:paste进入粘贴模式,注意spark-shell下代码中不能有tab
import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.Vectors
object KMeans {
def main(args: ArrayString]) {
// 1. 构造spark对象
val conf = new SparkConf().setMaster("local").setAppName("KMeans")
val sc = new SparkContext(conf)
// 去除多余的warn信息
// 2. 读取样本数据,LIBSVM格式
val data = sc.textFile("file:///test/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// 3. 新建KMeans模型,并训练
val initMode = "k-means||"
val numClusters = 2
val numIterations = 20
// 等同于:val model = KMeans.train(parsedData,2,20)
val model = new KMeans()
.setInitializationMode(initMode)
.setK(numClusters)
.setMaxIterations(numIterations)
.run(parsedData)
// 打印聚类中心
model.clusterCenters.foreach(println)
// 4. 误差计算
val WSSSE = model.computeCost(parsedData)
println("Within Set Sum of Squard Errors = " + WSSSE)
// 5. 保存模型、加载模型
val ModelPath = "file:///test/model/KMeans"
model.save(sc, ModelPath)
val sameModel = KMeansModel.load(sc, ModelPath)
}
}
如果报错内存不足:
Java.lang.IllegalArgumentException: System memory 468189184 must be at least 4.718592E8. Please use a larger heap size.
加上语句:
conf.set("spark.testing.memory", "2147480000") //数值大于512m即可
结果:
源码分析:
1. KMeans对象 基于随机梯度下降的SVM分类的伴生对象
train方法 我们常用的train函数,通过调用run方法来训练
2. KMeans类
run方法 训练方法,调用runAlgorithm方法来计算中心点
3. runAlgorithm方法 计算聚类中心点(kmeans算法的核心方法)
initRandom 初始化中心点的方法,支持random和kmeans++两种方法
iteration 迭代计算并更新中心点
4. KMeansModel 模型
predict 预测
1. KMeans对象(定义train方法)
伴生对象:同一个文件中对象名和类名一样,
伴生类和伴生对象的特点是可以相互访问被private修饰的字段
object KMeans{
// 待会初始化中心点的两种方式
val RANDOM = "random"
val K_MEANS_PARALLEL = "k-means||"
/**
* train函数的参数
* data:数据样本,格式为RDD[Vector]
* k:聚类数
* maxIterations:最大迭代次数
* runs:算法的并行度,默认为1
* initializationMode:初始化聚类中心点,支持random、kmeans++(默认)
* seed:初始化时的随机种子
* train函数返回一个KMeansModel类型的模型,KMeansModel介绍在下面
*/
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String,
seed: Long): KMeansModel = {
new KMeans()
.setK(k)
.setMaxIterations(maxIterations)
.setInitializationMode(initializationMode)
.setSeed(seed)
.run(data)
// 调用了run方法,run方法在KMeans类中
}
// 以下的几个def train()略,每个train减少一个参数重新定义一遍
}
KMeansModel:
train 函数训练后,生成 KMeans 模型
KMeansModel 包含的参数:中心点向量
KMeansModel 包含的方法:预测 predict、保存模型 save、加载模型 load
2. KMeans类
2.1 参数定义
KMeans类的参数与KMeans对象相比,少了data、多了epsilon和initializationSteps。data是run函数的参数。
/**
* epsilon:中心点距离的阈值,默认为1e-4
* initializationSteps:初始步长,默认为5
*/
class KMeans private (
private var k: Int,
private var maxIterations: Int,
private var runs: Int,
private var initializationMode: String,
private var initializationSteps: Int
private var epsilon: Double,
private var seed: Long) extends Serializable with Logging {
/**
* 设置这些参数的默认值
*/
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong())
/**
* 我们指定的参数值(k、maxIterations等),通过get-set方法获取
*/
// 参数:k
def getK: Int = k
def setK(k: Int): this.type = {
this.k = k
this
}
// 其他参数略
}
2.2 定义run方法
KMeans类的run方法是训练模型的,该方法主要调用runAlgorithm方法来计算中心点。
因为是迭代计算,所以数据应该被缓存(RDD的persisit)
/**
* 根据样本数据data进行模型训练,返回值:KMeansModel
*/
def run(data: RDD[vector]): KMeansModel = {
// 如果数据没有被缓存则warning
if (data.getStorageLevel == StorageLevel.NONE){
logWarning("The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached")
}
// 计算L2范数,并缓存。向量的2范数:向量的每个元素的平方和再开平方根.
// 这一步的目的是将数据转换为RDD[VectorWithNorm]格式,以便runAlgorithm方法使用
// data格式为RDD[vector],zippedData格式为RDD[vector, vector的L2范数]
val norms = data.map(Vectors.norm(_, 2.0))
norms.persist()
val zippedData = data.zip(norms).map { case (v, norm) =>
new VectorWithNorm(v, norm)} // 详解在下面
// 运行KMeans的核心方法,runAlgorithm方法,计算聚类中心点
val model = runAlgorithm(zippedData)
norms.unpersist()
model
}
演示一下数据格式的转换:
RDD[String] -> RDD[Vector] -> RDD[Double] -> RDD[Vector, Double)] -> RDD[VectorWithNorm]
(1) 首先RDD[String]转成RDD[Vector]
scala> val data = sc.textFile("file:///test/kmeans_data.txt") //data格式:RDD[String]
data: org.apache.spark.rdd.RDD[String] = file:///test/kmeans_data.txt MapPartitionsRDD[5] at textFile at <console>:30
scala> data.collect
res0: Array[String] = Array(0.1 0.1, 0.2 0.2, 9.0 9.0, 9.1 9.1, 9.2 9.2)
scala> val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[6] at map at <console>:31
scala> parsedData.collect // parsedData格式:RDD[Vector]
res1: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.1,0.1], [0.2,0.2], [9.0,9.0], [9.1,9.1], [9.2,9.2])
(2)RDD[Vector]转换成RDD[VectorWithNorm] (主要步骤)
scala> val norms = parsedData.map(Vectors.norm(_, 2.0)) // norms格式:RDD[Double]
norms: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[7] at map at <console>:31
scala> norms.collect
res2: Array[Double] = Array(0.14142135623730953, 0.28284271247461906, 12.727922061357855, 12.869343417595164, 13.010764773832474)
scala> val k = parsedData.zip(norms) // k格式:RDD[Vector, Double)]
k: org.apache.spark.rdd.RDD[(org.apache.spark.mllib.linalg.Vector, Double)] = ZippedPartitionsRDD2[10] at zip at <console>:33
scala> k.collect
res3: Array[(org.apache.spark.mllib.linalg.Vector, Double)] = Array(([0.1,0.1],0.14142135623730953), ([0.2,0.2],0.28284271247461906), ([9.0,9.0],12.727922061357855), ([9.1,9.1],12.869343417595164), ([9.2,9.2],13.010764773832474))
scala> val zippedData = k.map{case (v, norm) => new VectorWithNorm(v, norm)}
// zippedData格式:RDD[VectorWithNorm]
zippedData: org.apache.spark.rdd.RDD[VectorWithNorm] = MapPartitionsRDD[13] at map at <console>:32
scala> zippedData.collect
res4: Array[VectorWithNorm] = Array([email protected], [email protected], [email protected], [email protected], [email protected])
VectorWithNorm是后面自定义的数据类型:
class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable {
def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0))
def this(array: Array[Double]) = this(Vectors.dense(array))
/** Converts the vector to a dense vector. */
def toDense: VectorWithNorm = new VectorWithNorm(Vectors.dense(vector.toArray), norm)
}
3. runAlgorithm方法
计算聚类中心点(KMeans算法的核心方法)
(1)首先初始化中心点
(2)计算每个样本与各个中心点的距离,把样本分配到最近的中心点。
同时计数、统计中心点累加的样本值。
(3)将每个簇的样本重新计算中心点,更新中心点
(4)计算新的中心点与更新前中心点的差值,与阈值比较,判断是否结束迭代
/**
* KMeans算法的核心:runAlgorithm方法
* 输入data:RDD[VectorWithNorm],输出KMeansModel
*/
private def runAlgorithm(data:RDD[VectorWithNorm]): KMeansModel = {
val sc = data.sparkContext // 创建sc
// 计算初始化中心点的时间不重要,略
// 初始化中心
val centers = if (initializationMode == KMeans.RANDOM) {
initRandom(data)
} else {
initKMeansParallel(data)
}
val active = Array.fill(runs)(true)
val costs = Array.fill(runs)(0.0)
val activeRuns = new ArrayBuffer[Int] ++ (0 until runs)
val iteration = 0
// 迭代运算
while (iteration < maxIterations && !activeRuns.isEmpty) {
type WeightedPoint = (Vector, Long) // 自定义WeightedPoint类型
// mergeContribs函数用于计算样本点的和sum,后面会调用
def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = {
/**
* axpy:y._1 = y._1 + x._1
* 其中下划线表示取值, y._1 是y的第一个元素,y: (Vector, Long)
*/
axpy(1.0, x_1, y._1)
(y._1, x._2 + y._2) // 返回值,WeightedPoint类型
}
// activeCenters用来计算聚类中心点
// activeCenters格式:由 runs 个 VectorWithNorm 组成
val activeCenters = activeRuns.map(r => centers(r)).toArray
// costAccums用来计算样本所属中心点下的cost值之和(使用了accumulator累加器)
val costAccums = activeRuns.map(_ => sc.accumulator(0.0))
// 广播聚类中心点
val bcActiveCenters = sc.broadcast(activeCenters)
/**
* 计算属于每个中心点的样本点,并对每个簇的样本进行累加和计数
* runs:并行度,k:中心点个数
* sum:中心点对应的所有样本的累加值,counts:中心点样本计数
* contribs:((并行度 i,中心点 j),(sum,count)
* findClosest方法:找到点与中心点最近的中心点
*/
val totalContribs = data.mapPartitions { points =>
val thisActiveCenters = bcActiveCenters.value // 当前的聚类中心点
val runs = thisActiveCenters.length // 还剩多少runs存活,runs:并行度
val k = thisActiveCenters(0).length // k
val dims = thisActiveCenters(0)(0).vector.size // 中心点的维度
// sums 用来计算每个簇所有样本的向量之和
val sums = Array.fill(runs, k)(Vectors.zeros(dims)) //(见代码演示)
// counts 用来统计每个簇样本点的数量
val counts = Array.fill(runs, k)(0L)
// 计算样本point属于哪个中心点bestCenter,以及在该中心点下的cost值
points.foreach { point =>
(0 until runs).foreach { i => //并行计算
val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)
costAccums(i) += cost //并行度i下的cost之和
axpy(1.0, point.vector, sum) // sum = sum + point
counts(i)(bestCenter) += 1 // 计数
}
}
// 得到每个簇下样本点的向量和、样本点的数目(i是并行度,j是聚类中心)
val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
((i, j), (sums(i)(j), counts(i)(j))) // for yield见代码演示
}
contribs.iterator
}.reduceByKey(mergeContribs).collectAsMap()
// 聚合操作,对于相同的(i, j),i是并行度,j是聚类中心
// 对属于同一中心下的样本向量之和、样本数量进行累加操作
// 更新中心点,sum = sum / count
for ((run, i) <- activeRuns.zipWithIndex) {
val changed = false
val j = 0
while (j < k) {
if (count != 0) {
scal(1.0 / count, sum) // sum = sum/count
val newCenter = new VectorWithNorm(sum) // 新中心点
// 比较新旧中心点距离和阈值
if (KMeans.fastSquaredDistance(newCenter, centers(run)(j))
> epsilon*eplison){
change = true // 如果大于,则中心点改变了,需要更新中心点
}
centers(run)(j) = newCenter // 更新中心点
}
j += 1
}
if (!changed) {
active(run) = false // 所有中心点不再改变了
}
costs(run) = costAccums(i).value
}
activeRuns = activeRuns.filter(active(_)) //过滤中心点已完成收敛的并行计算
iteration += 1
}
new KMeansModel(centers(bestRun).map(_.vector))
}
代码演示:
val sums = Array.fill(runs, k)(Vectors.zeros(dims))
使用 dims 维零向量填充 runs 行 k 列的数组
scala> val sums = Array.fill(1, 3)(Vectors.zeros(2))
sums: Array[Array[org.apache.spark.mllib.linalg.Vector]] = Array(Array([0.0,0.0], [0.0,0.0], [0.0,0.0]))
for循环配合yield:
for循环中的 yield 会把当前的元素记下来,保存在集合中,循环结束后将返回该集合。
scala> for (i <- 1 to 5) yield i
res10: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 3, 4, 5)
其它
初始化中心点:
初始化中心点的方法,支持random和kmeans++两种方法
看random方法:
private def initRandom(data: RDD[VectorWithNorm]): Array[Array[VectorWithNorm]]={
// 从样本数据data中随机抽取runs*k个数据作为中心点
val sample = data.takeSample(true, runs *k, 第三参数略)...
}
takeSample函数:
类似于sample函数,该函数接受三个参数,第一个参数withReplacement ,表示采样是否放回,true表示有放回的采样,false表示无放回采样;第二个参数num,表示返回的采样数据的个数,这个也是takeSample函数和sample函数的区别;第三个参数seed,表示用于指定的随机数生成器种子。
快速距离计算:
KMeans 对象定义了
(1)findClost 方法
快速找到点与所有中心点中最近的一个中心点
(2)fastSquaredDistance 方法
快速计算两点之间的距离