我正在使用Spark 2.1.1。
您可以使用spark ml library function 查找列之间的相关性
让我们首先导入类。
import org.apache.spark.sql.functions.corr
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics
现在准备输入数据帧:
scala> val seqRow = Seq(
| ("2017-04-27",13,21),
| ("2017-04-26",7,16),
| ("2017-04-25",40,17),
| ("2017-04-24",17,17),
| ("2017-04-21",10,20),
| ("2017-04-20",9,19),
| ("2017-04-19",30,30),
| ("2017-04-18",18,25),
| ("2017-04-14",32,28),
| ("2017-04-13",39,18),
| ("2017-04-12",2,4),
| ("2017-04-11",8,24),
| ("2017-04-10",18,27),
| ("2017-04-07",6,17),
| ("2017-04-06",13,29),
| ("2017-04-05",10,17),
| ("2017-04-04",6,8),
| ("2017-04-03",20,32)
| )
seqRow: Seq[(String, Int, Int)] = List((2017-04-27,13,21), (2017-04-26,7,16), (2017-04-25,40,17), (2017-04-24,17,17), (2017-04-21,10,20), (2017-04-20,9,19), (2017-04-19,30,30), (2017-04-18,18,25), (2017-04-14,32,28), (2017-04-13,39,18), (2017-04-12,2,4), (2017-04-11,8,24), (2017-04-10,18,27), (2017-04-07,6,17), (2017-04-06,13,29), (2017-04-05,10,17), (2017-04-04,6,8), (2017-04-03,20,32))
scala> val rdd = sc.parallelize(seqRow)
rdd: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:34
scala> val input_df = spark.createDataFrame(rdd).toDF("date","amount","prediction").withColumn("residuals",'amount - 'prediction)
input_df: org.apache.spark.sql.DataFrame = [date: string, amount: int ... 2 more fields]
scala> input_df.show(false)
+----------+------+----------+---------+
|date |amount|prediction|residuals|
+----------+------+----------+---------+
|2017-04-27|13 |21 |-8 |
|2017-04-26|7 |16 |-9 |
|2017-04-25|40 |17 |23 |
|2017-04-24|17 |17 |0 |
|2017-04-21|10 |20 |-10 |
|2017-04-20|9 |19 |-10 |
|2017-04-19|30 |30 |0 |
|2017-04-18|18 |25 |-7 |
|2017-04-14|32 |28 |4 |
|2017-04-13|39 |18 |21 |
|2017-04-12|2 |4 |-2 |
|2017-04-11|8 |24 |-16 |
|2017-04-10|18 |27 |-9 |
|2017-04-07|6 |17 |-11 |
|2017-04-06|13 |29 |-16 |
|2017-04-05|10 |17 |-7 |
|2017-04-04|6 |8 |-2 |
|2017-04-03|20 |32 |-12 |
+----------+------+----------+---------+
2017-04-14 行的 2017-04-14 和 2017-04-13 的值不匹配,因为我减去 amount - prediction 的 residuals
现在继续计算所有列之间的相关性。
此方法用于计算列数较多且需要各列与其他列之间的相关性时的相关性。
首先我们删除不需要计算相关性的列
scala> val drop_date_df = input_df.drop('date)
drop_date_df: org.apache.spark.sql.DataFrame = [amount: int, prediction: int ... 1 more field]
scala> drop_date_df.show
+------+----------+---------+
|amount|prediction|residuals|
+------+----------+---------+
| 13| 21| -8|
| 7| 16| -9|
| 40| 17| 23|
| 17| 17| 0|
| 10| 20| -10|
| 9| 19| -10|
| 30| 30| 0|
| 18| 25| -7|
| 32| 28| 4|
| 39| 18| 21|
| 2| 4| -2|
| 8| 24| -16|
| 18| 27| -9|
| 6| 17| -11|
| 13| 29| -16|
| 10| 17| -7|
| 6| 8| -2|
| 20| 32| -12|
+------+----------+---------+
由于相关性多于2列,我们需要找到相关性矩阵。
为了计算 相关矩阵,我们需要 RDD[Vector],正如您在 spark 相关性示例中看到的那样。
scala> val dense_rdd = drop_date_df.rdd.map{row =>
| val first = row.getAs[Integer]("amount")
| val second = row.getAs[Integer]("prediction")
| val third = row.getAs[Integer]("residuals")
| Vectors.dense(first.toDouble,second.toDouble,third.toDouble)}
dense_rdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[62] at map at <console>:40
scala> val correlMatrix: Matrix = Statistics.corr(dense_rdd, "pearson")
correlMatrix: org.apache.spark.mllib.linalg.Matrix =
1.0 0.40467032516705076 0.782939330961529
0.40467032516705076 1.0 -0.2520531290688281
0.782939330961529 -0.2520531290688281 1.0
列的顺序保持不变,但您丢失了列名。
您可以找到有关相关矩阵结构的好资源。
因为您想找到残差与其他两列的相关性。
我们可以探索其他选择
蜂巢 corr UDAF
scala> drop_date_df.createOrReplaceTempView("temp_table")
scala> val corr_query_df = spark.sql("select corr(amount,residuals) as amount_residuals_corr,corr(prediction,residuals) as prediction_residual_corr from temp_table")
corr_query_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]
scala> corr_query_df.show
+---------------------+------------------------+
|amount_residuals_corr|prediction_residual_corr|
+---------------------+------------------------+
| 0.7829393309615287| -0.252053129068828|
+---------------------+------------------------+
Spark corr 函数 link
scala> val corr_df = drop_date_df.select(
| corr('amount,'residuals).as("amount_residuals_corr"),
| corr('prediction,'residuals).as("prediction_residual_corr"))
corr_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]
scala> corr_df.show
+---------------------+------------------------+
|amount_residuals_corr|prediction_residual_corr|
+---------------------+------------------------+
| 0.7829393309615287| -0.252053129068828|
+---------------------+------------------------+