【发布时间】:2017-11-01 10:28:47
【问题描述】:
我有一个包含三列(字符串)的 csv 文件,并且我有这个用于在 Zeppelin 上进行聚类的代码
这是代码:
case class kmeansScore(k: String, score: String,j: String )
val rawData = sc.textFile("/resources/data/v1.csv")
rawData.map(_.split(',').last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println)
import org.apache.spark.mllib.linalg._
val labelsAndData = rawData.zipWithIndex.flatMap {
case (line,index) =>
if (index == 0) {
None
} else {
val buffer = line.split(',').toBuffer
buffer.remove(1, 4)
val label = buffer.remove(buffer.length-1)
val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
Some((label,vector))
}
}
import org.apache.spark.mllib.clustering._
def distance(a: Vector, b: Vector) = math.sqrt(a.toArray.zip(b.toArray).map(p => p._1 - p._2).map(d => d * d).sum)
def distToCentroid(datum: Vector, model: KMeansModel) = {
val cluster = model.predict(datum)
val centroid = model.clusterCenters(cluster)
distance(centroid, datum)
}
import org.apache.spark.rdd._
val dataAsArray = labelsAndData.values.map(_.toArray).cache()
val dataAsArray.first().length
但是我收到了这个错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 138, localhost): java.lang.IndexOutOfBoundsException: 1
at scala.collection.mutable.ArrayBuffer.remove(ArrayBuffer.scala:158)
有什么问题?我正在 https://my.datascientistworkbench.com/tools/zeppelin-notebook/ 中开发 Zeppelin
【问题讨论】:
-
你提供的那个链接需要登录
-
@MattiLyra 是的,我有一个帐户,有什么意义?
-
该链接对这里没有帐户的任何人有什么用处。我们应该看看笔记本吗?
标签: apache-spark cluster-analysis apache-zeppelin