【问题标题】:Spark ML: Data de-normalizationSpark ML:数据反规范化
【发布时间】:2019-02-05 16:55:55
【问题描述】:

我需要对使用 Spark 中的 ML 的 MinMaxScaler 方法规范化的数据进行反规范化。

我能够按照以下步骤规范化我的数据:Spark: convert an RDD[LabeledPoint] to a Dataframe to apply MinMaxScaler, and after scaling get the normalized RDD[LabeledPoint],我之前发布的。

例如,原来的df 有两个前列,缩放后的结果是:

+------+--------------------+--------------------+
|labels|            features|      featuresScaled|
+------+--------------------+--------------------+
|   1.0|[6.0,7.0,42.0,1.1...|[1.0,0.2142857142...|
|   1.0|[6.0,18.0,108.0,3...|[1.0,1.0,1.0,1.0,...|
|   1.0|[5.0,7.0,35.0,1.4...|[0.0,0.2142857142...|
|   1.0|[5.0,8.0,40.0,1.6...|[0.0,0.2857142857...|
|   1.0|[6.0,4.0,24.0,0.6...|[1.0,0.0,0.0,0.0,...|
+------+--------------------+--------------------+

问题是,现在我需要做相反的过程:去规范化。

为此,我需要 features 向量内每个特征列的 minmax 值,以及要非规范化的值。

要获得minmax,我要求MinMaxScaler如下:

val df_fitted = scaler.fit(df_all)
val df_fitted_original_min = df_fited.originalMin   // Vector
val df_fitted_original_max = df_fited.originalMax   // Vector

df_fited_original_min[1.0,1.0,7.0,0.007,0.052,0.062,1.0,1.0,7.0,1.0]
df_fited_original_max[804.0,553.0,143993.0,537.0,1.0,1.0,4955.0,28093.0,42821.0,3212.0]

另一方面,我的 DataFrame 是这样的:

+--------------------+-----+--------------------+--------------------+-----+-----+--------------------+--------------------+--------------------+-----+
|               col_0|col_1|               col_2|               col_3|col_4|col_5|               col_6|               col_7|               col_8|col_9|
+--------------------+-----+--------------------+--------------------+-----+-----+--------------------+--------------------+--------------------+-----+
|0.009069428120139292|  0.0|9.015488712438252E-6|2.150418860440459E-4|  1.0|  1.0|0.001470074844665...|2.205824685144127...|2.780971210319238...|  0.0|
|0.008070826019024355|  0.0|3.379696051366339...|2.389342641479033...|  1.0|  1.0|0.001308210192425627|1.962949264985630...|1.042521123176856...|  0.0|
|0.009774715414895803|  0.0|1.299590589291292...|1.981673063697640...|  1.0|  1.0|0.001584395736407...|2.377361424206848...| 4.00879434193585E-5|  0.0|
|0.009631155146285946|  0.0|1.218569739510422...|2.016021040879828E-4|  1.0|  1.0|0.001561125874539...|2.342445354515269...|3.758872615157643E-5|  0.0|

现在,我需要应用以下等式来获取新值,但我不知道如何实现。

X_original = ( X_scaled * (max - min) ) + min

对于 DF 中的每个位置,我必须将此方程与相应的 maxmin 值应用到向量中。

例如:DF 的第一行第一列是0.009069428120139292。在同一列中,对应的minmax 值为:1.0804.0。 所以,非规范化的值是:

X_den = ( 0.009069428120139292 * (804.0 - 1.0) ) + 1.0

有必要澄清在程序期间首先标准化的 DF 已被修改。因此,我需要应用反规范化(如果没有,最简单的方法是保留原始 DF 的副本)。

【问题讨论】:

    标签: scala apache-spark dataframe machine-learning


    【解决方案1】:

    您“简单地”以相反的顺序应用逆运算。该等式在文档here 中。感兴趣的代码是:

    X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
    X_scaled = X_std * (max - min) + min
    

    您现在拥有X_saled 值的数据集,并且您想要恢复原始X 值。您的直接问题是您在转换中丢失一些基本信息。 X_scaled 是范围 [0, 1] 上的一组数据;您无法知道原始范围是多少。

    要完成这项工作,请找到并保留原始的 min 和 `max 值。现在,很容易反转每个元素的线性变换:

    X_original = X_scaled * (max - min) + min
    

    你能从那里拿走吗?

    【讨论】:

    • 谢谢!但问题是标准化应用于符合 LabeledPoint 的向量。现在,我需要 Vector 中每个特征的最小值和最大值,以便在新的 DataFrame 中应用它们。为了更清楚,我将在我的帖子中添加一个示例。
    • 我明白了。看来你已经知道你需要做什么了:获取每个向量的最小值和最大值,并应用我给你的转换来恢复原始值。
    • 更简单,为什么不保留原始值的副本?
    • 是的,但要反规范化的数据与原始数据略有不同。我需要使用由最小值和最大值确定的范围来转换它们
    【解决方案2】:

    我从下面的https://stackoverflow.com/a/50314767/9759150 中得到了答案,再加上对我的问题的稍微适应,我已经完成了反规范化过程。

    让我们将normalized_df 视为具有 10 列的数据框(在我的问题中显示):

    import org.apache.spark.sql.Column
    import org.apache.spark.sql.functions._
    
    val updateFunction = (columnValue: Column, minValue: Int, maxValue: Int) =>
        (columnValue * ( lit(maxValue) - lit(minValue))) + lit(minValue)
    
    val updateColumns = (df: DataFrame, minVector: Vector, maxVector: Vector, updateFunction: (Column, Int, Int) => Column) => {
        val columns = df.columns
        minVector.toArray.zipWithIndex.map{
          case (updateValue, index) =>
            updateFunction( col(columns(index.toInt)), minVector(index).toInt, maxVector(index).toInt ).as(columns(index.toInt))
        }
    }
    
    var dfUpdated = normalized_df.select(
      updateColumns(normalized_df, df_fitted_original_min, df_fitted_original_max, updateFunction) :_*
    )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-24
      • 2017-04-26
      • 1970-01-01
      • 2020-07-31
      • 2018-10-14
      相关资源
      最近更新 更多