算法:

1. 选择k个簇中心,作为聚类中心 。

2. 计算每个样本点到聚类中心的距离,将每个样品点分配到最近的聚类中心,形成k个簇。

3. 计算每个簇的平均值,并将这个平均值作为新的聚类中心。

4. 反复执行2、3步骤,直到旧质心和新质心的差异小于阈值或迭代次数达到要求为止。

 

实例:

在IDEA运行,如果是spark-shell命令行窗口,

可使用:paste进入粘贴模式,注意spark-shell下代码中不能有tab


import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.Vectors

object KMeans {

  def main(args: ArrayString]) {
  // 1. 构造spark对象
    val conf = new SparkConf().setMaster("local").setAppName("KMeans")
    val sc = new SparkContext(conf)
    // 去除多余的warn信息

    // 2. 读取样本数据,LIBSVM格式
    val data = sc.textFile("file:///test/kmeans_data.txt")
    val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

    // 3. 新建KMeans模型,并训练
    val initMode = "k-means||"
    val numClusters = 2
    val numIterations = 20
    // 等同于:val model = KMeans.train(parsedData,2,20)
    val model = new KMeans()
    .setInitializationMode(initMode)
    .setK(numClusters)
    .setMaxIterations(numIterations)
    .run(parsedData)
    // 打印聚类中心
    model.clusterCenters.foreach(println)
    
    // 4. 误差计算
    val WSSSE = model.computeCost(parsedData)
    println("Within Set Sum of Squard Errors = " + WSSSE)

    // 5. 保存模型、加载模型
    val ModelPath = "file:///test/model/KMeans"
    model.save(sc, ModelPath)
    val sameModel = KMeansModel.load(sc, ModelPath)
  }
}

如果报错内存不足:

Java.lang.IllegalArgumentException: System memory 468189184 must be at least 4.718592E8. Please use a larger heap size.

加上语句:

conf.set("spark.testing.memory", "2147480000") //数值大于512m即可

结果:

实例1:KMeans(含源码分析)

 

源码分析:

1. KMeans对象	基于随机梯度下降的SVM分类的伴生对象
    train方法	我们常用的train函数,通过调用run方法来训练
2. KMeans类			
    run方法		训练方法,调用runAlgorithm方法来计算中心点
3. runAlgorithm方法	计算聚类中心点(kmeans算法的核心方法)
    initRandom	初始化中心点的方法,支持random和kmeans++两种方法
    iteration	迭代计算并更新中心点
4. KMeansModel	模型
    predict		预测

1. KMeans对象(定义train方法)

伴生对象:同一个文件中对象名和类名一样,

伴生类和伴生对象的特点是可以相互访问被private修饰的字段

object KMeans{
// 待会初始化中心点的两种方式
val RANDOM = "random"
val K_MEANS_PARALLEL = "k-means||"

/**
 * train函数的参数
 * data:数据样本,格式为RDD[Vector]
 * k:聚类数
 * maxIterations:最大迭代次数
 * runs:算法的并行度,默认为1
 * initializationMode:初始化聚类中心点,支持random、kmeans++(默认)
 * seed:初始化时的随机种子
 * train函数返回一个KMeansModel类型的模型,KMeansModel介绍在下面
*/
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String,
seed: Long): KMeansModel = {
new KMeans()
.setK(k)
.setMaxIterations(maxIterations)
.setInitializationMode(initializationMode)
.setSeed(seed)
.run(data)
// 调用了run方法,run方法在KMeans类中
}

// 以下的几个def train()略,每个train减少一个参数重新定义一遍
}

KMeansModel:

train 函数训练后,生成 KMeans 模型

KMeansModel 包含的参数:中心点向量

KMeansModel 包含的方法:预测 predict、保存模型 save、加载模型 load

 

2. KMeans类

2.1 参数定义

KMeans类的参数与KMeans对象相比,少了data、多了epsilon和initializationSteps。data是run函数的参数。

/**
 * epsilon:中心点距离的阈值,默认为1e-4
 * initializationSteps:初始步长,默认为5
*/
class KMeans private (
private var k: Int,
private var maxIterations: Int,
private var runs: Int,
private var initializationMode: String,
private var initializationSteps: Int
private var epsilon: Double,
private var seed: Long) extends Serializable with Logging {
/**
 * 设置这些参数的默认值
*/
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong())
/**
 * 我们指定的参数值(k、maxIterations等),通过get-set方法获取
*/
// 参数:k
def getK: Int = k
def setK(k: Int): this.type = {
this.k = k
this
}
// 其他参数略
}

2.2 定义run方法

KMeans类的run方法是训练模型的,该方法主要调用runAlgorithm方法来计算中心点。

因为是迭代计算,所以数据应该被缓存(RDD的persisit)

/**
 * 根据样本数据data进行模型训练,返回值:KMeansModel
*/
def run(data: RDD[vector]): KMeansModel = {

// 如果数据没有被缓存则warning
if (data.getStorageLevel == StorageLevel.NONE){
logWarning("The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached")
}

// 计算L2范数,并缓存。向量的2范数:向量的每个元素的平方和再开平方根.
// 这一步的目的是将数据转换为RDD[VectorWithNorm]格式,以便runAlgorithm方法使用
// data格式为RDD[vector],zippedData格式为RDD[vector, vector的L2范数]
val norms = data.map(Vectors.norm(_, 2.0))
norms.persist()
val zippedData = data.zip(norms).map { case (v, norm) => 
new VectorWithNorm(v, norm)}	// 详解在下面

// 运行KMeans的核心方法,runAlgorithm方法,计算聚类中心点
val model = runAlgorithm(zippedData)
norms.unpersist()

model
}

演示一下数据格式的转换:

RDD[String] -> RDD[Vector] -> RDD[Double] -> RDD[Vector, Double)] -> RDD[VectorWithNorm]

(1) 首先RDD[String]转成RDD[Vector]

scala> val data = sc.textFile("file:///test/kmeans_data.txt") 	//data格式:RDD[String]
data: org.apache.spark.rdd.RDD[String] = file:///test/kmeans_data.txt MapPartitionsRDD[5] at textFile at <console>:30
scala> data.collect
res0: Array[String] = Array(0.1 0.1, 0.2 0.2, 9.0 9.0, 9.1 9.1, 9.2 9.2)        
scala> val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[6] at map at <console>:31
scala> parsedData.collect			 // parsedData格式:RDD[Vector] 
res1: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.1,0.1], [0.2,0.2], [9.0,9.0], [9.1,9.1], [9.2,9.2])

(2)RDD[Vector]转换成RDD[VectorWithNorm] (主要步骤)

scala> val norms = parsedData.map(Vectors.norm(_, 2.0))	// norms格式:RDD[Double]
norms: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[7] at map at <console>:31
scala> norms.collect
res2: Array[Double] = Array(0.14142135623730953, 0.28284271247461906, 12.727922061357855, 12.869343417595164, 13.010764773832474)
scala> val k = parsedData.zip(norms)        // k格式:RDD[Vector, Double)]
k: org.apache.spark.rdd.RDD[(org.apache.spark.mllib.linalg.Vector, Double)] = ZippedPartitionsRDD2[10] at zip at <console>:33
scala> k.collect
res3: Array[(org.apache.spark.mllib.linalg.Vector, Double)] = Array(([0.1,0.1],0.14142135623730953), ([0.2,0.2],0.28284271247461906), ([9.0,9.0],12.727922061357855), ([9.1,9.1],12.869343417595164), ([9.2,9.2],13.010764773832474))
scala> val zippedData = k.map{case (v, norm) => new VectorWithNorm(v, norm)}  
// zippedData格式:RDD[VectorWithNorm]
zippedData: org.apache.spark.rdd.RDD[VectorWithNorm] = MapPartitionsRDD[13] at map at <console>:32
scala> zippedData.collect
res4: Array[VectorWithNorm] = Array([email protected], [email protected], [email protected], [email protected], [email protected])

VectorWithNorm是后面自定义的数据类型:

class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable {
  def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0))
  def this(array: Array[Double]) = this(Vectors.dense(array))
  /** Converts the vector to a dense vector. */
  def toDense: VectorWithNorm = new VectorWithNorm(Vectors.dense(vector.toArray), norm) 
}

3. runAlgorithm方法

计算聚类中心点(KMeans算法的核心方法)

(1)首先初始化中心点

(2)计算每个样本与各个中心点的距离,把样本分配到最近的中心点。

同时计数、统计中心点累加的样本值。

(3)将每个簇的样本重新计算中心点,更新中心点

(4)计算新的中心点与更新前中心点的差值,与阈值比较,判断是否结束迭代

/**
 * KMeans算法的核心:runAlgorithm方法
 * 输入data:RDD[VectorWithNorm],输出KMeansModel
*/
private def runAlgorithm(data:RDD[VectorWithNorm]): KMeansModel = {
val sc = data.sparkContext		// 创建sc
// 计算初始化中心点的时间不重要,略

// 初始化中心
val centers = if (initializationMode == KMeans.RANDOM) {
initRandom(data)
} else {
initKMeansParallel(data)
}

val active = Array.fill(runs)(true)
val costs = Array.fill(runs)(0.0)
val activeRuns = new ArrayBuffer[Int] ++ (0 until runs)
val iteration = 0

// 迭代运算
while (iteration < maxIterations && !activeRuns.isEmpty) {
type WeightedPoint = (Vector, Long)	// 自定义WeightedPoint类型

// mergeContribs函数用于计算样本点的和sum,后面会调用
def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = {
/** 
* axpy:y._1 = y._1 + x._1
* 其中下划线表示取值, y._1 是y的第一个元素,y: (Vector, Long)
*/
axpy(1.0, x_1, y._1)
(y._1, x._2 + y._2)		// 返回值,WeightedPoint类型
}

// activeCenters用来计算聚类中心点
// activeCenters格式:由 runs 个 VectorWithNorm 组成
val activeCenters = activeRuns.map(r => centers(r)).toArray
// costAccums用来计算样本所属中心点下的cost值之和(使用了accumulator累加器)
val costAccums = activeRuns.map(_ => sc.accumulator(0.0))
// 广播聚类中心点
val bcActiveCenters = sc.broadcast(activeCenters)

/**
* 计算属于每个中心点的样本点,并对每个簇的样本进行累加和计数
* runs:并行度,k:中心点个数
* sum:中心点对应的所有样本的累加值,counts:中心点样本计数
* contribs:((并行度 i,中心点 j),(sum,count)
* findClosest方法:找到点与中心点最近的中心点
*/
val totalContribs = data.mapPartitions { points =>
val thisActiveCenters = bcActiveCenters.value	// 当前的聚类中心点
val runs = thisActiveCenters.length	// 还剩多少runs存活,runs:并行度
val k = thisActiveCenters(0).length	// k
val dims = thisActiveCenters(0)(0).vector.size	// 中心点的维度

// sums 用来计算每个簇所有样本的向量之和
val sums = Array.fill(runs, k)(Vectors.zeros(dims))		//(见代码演示)
// counts 用来统计每个簇样本点的数量
val counts = Array.fill(runs, k)(0L)

// 计算样本point属于哪个中心点bestCenter,以及在该中心点下的cost值
points.foreach { point =>	
(0 until runs).foreach { i =>	//并行计算
val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)
costAccums(i) += cost	//并行度i下的cost之和
axpy(1.0, point.vector, sum)		// sum = sum + point
counts(i)(bestCenter) += 1		// 计数
}
}

// 得到每个簇下样本点的向量和、样本点的数目(i是并行度,j是聚类中心)
val contribs = for (i <- 0 until runs; j <- 0 until k) yield {	
((i, j), (sums(i)(j), counts(i)(j)))		// for yield见代码演示
}
contribs.iterator
}.reduceByKey(mergeContribs).collectAsMap()
// 聚合操作,对于相同的(i, j),i是并行度,j是聚类中心
// 对属于同一中心下的样本向量之和、样本数量进行累加操作

// 更新中心点,sum = sum / count
for ((run, i) <- activeRuns.zipWithIndex) {
val changed = false
val j = 0
while (j < k) {
if (count != 0) {
scal(1.0 / count, sum)		// sum = sum/count
val newCenter = new VectorWithNorm(sum)	// 新中心点
// 比较新旧中心点距离和阈值
if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) 
> epsilon*eplison){
change = true		// 如果大于,则中心点改变了,需要更新中心点
}
centers(run)(j) = newCenter		// 更新中心点
}
j += 1
}
if (!changed) {
active(run) = false		// 所有中心点不再改变了
}
costs(run) = costAccums(i).value
}
activeRuns = activeRuns.filter(active(_))	//过滤中心点已完成收敛的并行计算
iteration += 1
}
new KMeansModel(centers(bestRun).map(_.vector))
}

代码演示:

val sums = Array.fill(runs, k)(Vectors.zeros(dims))

使用 dims 维零向量填充 runs 行 k 列的数组

scala> val sums = Array.fill(1, 3)(Vectors.zeros(2))

sums: Array[Array[org.apache.spark.mllib.linalg.Vector]] = Array(Array([0.0,0.0], [0.0,0.0], [0.0,0.0]))

for循环配合yield:

for循环中的 yield 会把当前的元素记下来,保存在集合中,循环结束后将返回该集合。

scala> for (i <- 1 to 5) yield i

res10: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 3, 4, 5)

其它

初始化中心点:

初始化中心点的方法,支持random和kmeans++两种方法

看random方法:

private def initRandom(data: RDD[VectorWithNorm]): Array[Array[VectorWithNorm]]={

// 从样本数据data中随机抽取runs*k个数据作为中心点

val sample = data.takeSample(true, runs *k, 第三参数略)...

}

takeSample函数:

类似于sample函数,该函数接受三个参数,第一个参数withReplacement ,表示采样是否放回,true表示有放回的采样,false表示无放回采样;第二个参数num,表示返回的采样数据的个数,这个也是takeSample函数和sample函数的区别;第三个参数seed,表示用于指定的随机数生成器种子。

 

快速距离计算:

KMeans 对象定义了

(1)findClost 方法

快速找到点与所有中心点中最近的一个中心点

(2)fastSquaredDistance 方法

快速计算两点之间的距离

相关文章:

  • 2021-12-17
  • 2022-12-23
  • 2022-12-23
  • 2021-09-27
  • 2021-04-17
  • 2021-10-18
  • 2021-11-30
  • 2021-09-07
猜你喜欢
  • 2022-01-18
  • 2021-05-03
  • 2021-10-09
  • 2022-12-23
  • 2021-12-21
  • 2021-07-28
相关资源
相似解决方案