【问题标题】:Find Normal value using Min and Max from scala data-frame使用 scala 数据帧中的 Min 和 Max 查找 Normal 值
【发布时间】:2017-08-28 11:44:48
【问题描述】:

我有一个包含 39 列的数据框,每一列都有不同的正常范围。 通过使用正常范围,我想找出正常值并放入 0 否则放入 1。

这是我所做的,但我想为 39 列做。

val test :(Double => Double) =  (value: Double) =>
{
    if(value >= 45 && value <= 62) 0
    else 1
}

但我不明白如何对每一列使用不同的值。

例如: 我有这个DF

+--------------------+---------+-------------------------+---------+
|a                   |b        |c                        |d        |
+--------------------+---------+-------------------------+---------+
|               207.0|     40.0|                    193.0|     39.0|
|                98.0|     17.0|                    193.0|     15.0|
|               207.0|     13.0|                    193.0|     17.0|
|               207.0|     26.0|                    193.0|     23.0|
|               207.0|     35.0|                    193.0|     24.0|
|               207.0|     91.0|                    193.0|     45.0|
|               207.0|     40.0|                    193.0|     37.0|
|               207.0|     23.0|                    193.0|     23.0|
|               207.0|     26.0|                    193.0|     22.0|
|               207.0|     39.0|                    193.0|     34.0|

我希望使用范围的结果如下所示

col  range
a   50-160
b   1-21
c   5-40
d   7-27

如果值在范围内则为0,否则为1

+--------------------+---------+-------------------------+---------+
|a                   |b        |c                        |d        |
+--------------------+---------+-------------------------+---------+
|                 1.0|      1.0|                      1.0|      1.0|
|                 0.0|      0.0|                      1.0|      0.0|
|                 1.0|      0.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      1.0|
|                 1.0|      1.0|                      1.0|      1.0|
|                 1.0|      1.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      0.0|
|                 1.0|      1.0|                      1.0|      1.0|

I want to do this for 39 columns.(scala/pyspark preferred)

【问题讨论】:

    标签: scala dataset spark-dataframe user-defined-functions unsupervised-learning


    【解决方案1】:

    您应该定义一个用户定义函数 (UDF),然后将其应用于您想要的每一列。

    这里是关于 Scala 用户定义函数的文档。它相当完整,我鼓励你阅读它。

    这里有一段摘录,可以帮助你快速了解我想去哪里:

    scala> df.withColumn("upper", upper('text)).show
    +---+-----+-----+
    | id| text|upper|
    +---+-----+-----+
    |  0|hello|HELLO|
    |  1|world|WORLD|
    +---+-----+-----+
    
    // You could have also defined the UDF this way
    val upperUDF = udf { s: String => s.toUpperCase }
    
    // or even this way
    val upperUDF = udf[String, String](_.toUpperCase)
    
    scala> df.withColumn("upper", upperUDF('text)).show
    +---+-----+-----+
    | id| text|upper|
    +---+-----+-----+
    |  0|hello|HELLO|
    |  1|world|WORLD|
    +---+-----+-----+
    

    您看到您的功能适用于整个列,结果将是一个新列。因此,您的函数应如下所示:

    def isInRange(e: Number, min: Number, max: Number): Boolean = (e < max && e > min)
    

    那么,对于给定的 minValue 和 maxValue,你要做的就是:

    myDF.withColumn("isInRange_a", udf(x => isInRange(x, minValue, maxValue).apply(myDF("a")))
    

    您现在可以将其应用于包含 (varName, maxValue, minValue) 的给定列表/数据帧是:

    • map/reduce 操作,您可以计算每一列是否是给定的范围。然后,您将加入给定的密钥(我不太了解您的问题,所以在这里我无法为您提供帮助)。此解决方案有效,但随着数据的增长会变得非常低效,因为您可能有几个看起来相似的键。

    • 递归操作,其目标是执行以下操作:myDF.whithColumn(...).withColumn(...).withColumn(...)

    我会选择第二种解决方案,因为键看起来很相似。

    你是怎么做到的?

    def applyMyUDFRecursively(myDF: DataFrame, List[MyRange]: rangesList): DataFrame =
    if (rangesList == null || rangesList.isEmpty) myDF
    else applyMyUDFRecursively(
        myDF.withColumn(myDF.withColumn("isInRange_" + rangesList.head._0, udf(x => isInRange(x, rangesList.head._1, rangesList.head._2).apply(myDF(rangesList.head._0))), rangesList.tail)
    

    现在您申请了所有列,但您的列可能过多。做这样的事情:

    resultDF.drop(rangesList.map(case x => x._0).collect: _*)
    

    注意类型归属,将drop函数应用于map/collect时获得的列表内的所有元素

    with val MyRange = Seq(varName: String, min: Number, max: Number)

    例如。对于您的 DataFrame,它应该如下所示(更简单的版本):

    def recApply(myDF: DataFrame, cols: List[String]): DataFrame =
    if (cols == null || cols.isEmpty) myDF
    else recApply(myDF.withColumn(myDF.withColumn("isInRange_" + col.head, udf(x => test(x).apply(myDF(cols.head))), cols.tail)
    

    然后,将此函数应用于您的 DF 并存储您的结果:

    val my_result = recApply(myDF, myDF.cols)
    

    【讨论】:

    • 如果有什么不清楚的地方请告诉我,我希望我给你钥匙让你现在自己处理这个问题,如果这适合你,请不要犹豫将问题标记为已回答
    • 感谢您的回答,这是我得到的最详细的回答,但仍然无法处理所有 39 列,您能否仅在上面的示例数据上显示它会更有帮助
    • 另外,你可以在上面看到我的 udf
    • 你的UDF在哪里?
    猜你喜欢
    • 1970-01-01
    • 2021-12-14
    • 1970-01-01
    • 2021-05-04
    • 1970-01-01
    • 1970-01-01
    • 2018-01-10
    • 2017-06-30
    • 2015-02-18
    相关资源
    最近更新 更多