-
Python配置 :Py4j模块、Pyspark模块
-
Windows 环境变量:Eclipse开发Pyspark
一. 线性回归
1.什么是回归?
从大量的函数结果和自变量反推回函数表达式的过程就是回归。线性回归是利用数理统计中回归分析来确定两种或两种以上变量间相互依赖的定量关系的一种统计分析方法。
-
一元线性回归:
只包括一个自变量(x)和一个因变量(y),且二者的关系可用一条直线近似表示,这种回归分析称为一元线性回归分析。公式: y = w0+w1x1
-
多元线性回归:
如果回归分析中包括两个或两个以上的自变量,且因变量和自变量之间是线性关系,则称为多元线性回归分析。公式:Ε(w0+w1x1+...+ wnxn)
代表一系列的真实离散点的y值,
代表一系列得到的直线在对应的x处的y值,一元线性回归中=。
m代表有m个离散点,前面的 是为了方便求导加上的。
表示的是一系列权重,,….
这个公式也叫做最小二乘法误差公式。error的值如果是0说明确定的这条直线穿过了所有的点,对于离散分散的一组点,error的值不可能为0。
通过求导的方式,理论上可以确定error的最小值,但是由于是一组数据,无法确定error最小下对应的这一组权重 。
正向的求导不能得到一组权重值,就不能确定线性回归的公式。那么可以根据数据集来穷举法反向的试,来确定线性回归的公式:
2. 梯度下降法调节参数
假设和error的函数关系中,中只有一个维度,那么这个关系其实就是和error的关系,反应图上就是一维平面关系,如果中有两个维度和,那么这个反映到图上就是个三维空间关系,以此类推。
如果中只有一个维度,如下图:
图中①,②,③,④,⑤,⑥,⑦都是图像的切线(斜率),在切点处可以求得对应的error值,如何调节模型得到一组 值可以使得到的error值更小,如图,沿着①->②->③方向调节和沿着④->⑤->⑥方向调节,可以使error值更小,这种沿着斜率绝对值减少的方向,沿着梯度的负方向每次迭代调节的方法就叫做梯度下降法。
梯度下降是迭代法的一种,可以用于求解最小二乘问题,在求解error函数的最小值时,可以通过梯度下降法来一步步的迭代求解,得到最小化的error函数和模型参数值。
每次按照步长(学习率)来调节值,迭代求出error的最小值,这里的error不可能有一个固定的最小值,只会向着最小值的方向收敛。
判断模型error误差值收敛,也就是停止迭代的两种方式:
-
- 指定一个error值,当迭代处理过程中得到的error值小于指定的值时,停止迭代。
- 设置迭代次数,当迭代次数达到设置的次数时,停止迭代。
迭代周期:从调整参数开始到计算出error值判断是否大于用户指定的error是一个迭代周期。
当步长比较小时,迭代次数多,训练模型的时间比较长,当步长比较大时,有可能迭代的次数也比较多,训练模型的时间也会相对比较长,需要找到一个合适的步长。
3. 模型过拟合
训练模型都会将数据集分为两部分,一般会将0.8比例的数据集作为训练集,将0.2比例的数据集作为测试集,来训练模型。模型过拟合就是训练出来的模型在训练集上表现很好,但是在测试集上表现较差的一种现象,也就是模型对已有的训练集数据拟合的非常好(误差值等于0),对于测试集数据拟合的非常差,模型的泛化能力比较差。
4.如何判断模型发生过拟合?
训练出模型后,可以在训练集中测试下模型的正确率,在测试集中测试下模型的正确率,如果两者差别很大(测试集正确率小,训练集正确率大),那么模型就有可能发生了过拟合。
5.Spark Mllib 线性回归案例
object LinearRegression { def main(args: Array[String]) { // 构建Spark对象 val conf = new SparkConf().setAppName("LinearRegressionWithSGD").setMaster("local") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) // sc.setLogLevel("WARN") //读取样本数据 val data_path1 = "lpsa.data" val data = sc.textFile(data_path1) val examples = data.map { line => val parts = line.split(',') val y = parts(0) val xs = parts(1) LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache() val train2TestData = examples.randomSplit(Array(0.8, 0.2), 1) /* * 迭代次数 * 训练一个多元线性回归模型收敛(停止迭代)条件: * 1、error值小于用户指定的error值 * 2、达到一定的迭代次数 */ val numIterations = 100 //在每次迭代的过程中 梯度下降算法的下降步长大小 0.1 0.2 0.3 0.4 val stepSize = 1 val miniBatchFraction = 1 val lrs = new LinearRegressionWithSGD() //让训练出来的模型有w0参数,就是由截距 lrs.setIntercept(true) //设置步长 lrs.optimizer.setStepSize(stepSize) //设置迭代次数 lrs.optimizer.setNumIterations(numIterations) //每一次下山后,是否计算所有样本的误差值,1代表所有样本,默认就是1.0 lrs.optimizer.setMiniBatchFraction(miniBatchFraction) val model = lrs.run(train2TestData(0)) println(model.weights) println(model.intercept) // 对样本进行测试 val prediction = model.predict(train2TestData(1).map(_.features)) val predictionAndLabel = prediction.zip(train2TestData(1).map(_.label)) val print_predict = predictionAndLabel.take(20) println("prediction" + "\t" + "label") for (i <- 0 to print_predict.length - 1) { println(print_predict(i)._1 + "\t" + print_predict(i)._2) } // 计算测试集平均误差 val loss = predictionAndLabel.map { case (p, v) => val err = p - v Math.abs(err) }.reduce(_ + _) val error = loss / train2TestData(1).count println(s"Test RMSE = " + error) // 模型保存 val ModelPath = "model" model.save(sc, ModelPath) // val sameModel = LinearRegressionModel.load(sc, ModelPath) sc.stop() }