【问题标题】:Spark Scala - Moving Average aligning result with pandas functionSpark Scala - 移动平均对齐结果与熊猫函数
【发布时间】:2020-02-07 21:37:32
【问题描述】:

我正在尝试将熊猫的移动平均函数转换为 spark scala。但是,似乎两者都会产生不同的结果。

熊猫代码:

dummy = {"value": [10, 20, 30,40, 50, 60, 70, 80, 90, 100],
         "name": ["aa" for i in range(0,10)]}
df= pd.DataFrame(dummy, columns=['name', 'value'])
pprint(df)
pprint(df.groupby('name').rolling(2).mean().shift(1))

火花代码

val df =List(("ABC", 10),
              ("ABC", 20),
              ("ABC", 30),
              ("ABC", 40),
              ("ABC", 50),
              ("ABC", 60),
              ("ABC", 70),
              ("ABC", 80),
              ("ABC", 90),
              ("ABC", 100)
            ).
          toDF("name", "value")

val window = Window.partitionBy($"name").orderBy($"value").rowsBetween(-2,1)
val df2 = df.withColumn("rolling_average", avg($"value") over(window))
display(df2)

熊猫输出

        value
name         
aa   0    NaN
     1    NaN
     2   15.0
     3   25.0
     4   35.0
     5   45.0
     6   55.0
     7   65.0
     8   75.0
     9   85.0

火花输出

+----+-----+---------------+
|name|value|rolling_average|
+----+-----+---------------+
| ABC|   10|           15.0|
| ABC|   20|           20.0|
| ABC|   30|           25.0|
| ABC|   40|           35.0|
| ABC|   50|           45.0|
| ABC|   60|           55.0|
| ABC|   70|           65.0|
| ABC|   80|           75.0|
| ABC|   90|           85.0|
| ABC|  100|           90.0|
+----+-----+---------------+

有没有办法让 spark 窗口函数产生与 Pandas 函数相似的输出?

【问题讨论】:

    标签: python pandas scala apache-spark


    【解决方案1】:

    刚刚将起始点之间的行从 1 更改为 -1。

    得到几乎相同的结果,然后将 10 的第二个条目更改为 null,以提供完全相同的结果。

    %scala
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions.{avg,col,lit,when}
    val df =List(("ABC", 10),
                  ("ABC", 20),
                  ("ABC", 30),
                  ("ABC", 40),
                  ("ABC", 50),
                  ("ABC", 60),
                  ("ABC", 70),
                  ("ABC", 80),
                  ("ABC", 90),
                  ("ABC", 100)
                ).
              toDF("name", "value")
    
    val window = Window.partitionBy($"name").orderBy($"value").rowsBetween(-2, -1)
    val df2 = df.withColumn("rolling_average", avg($"value") over(window)).withColumn("rolling_average", when(col("rolling_average")===10, lit(null)).otherwise(col("rolling_average")))
    df2.show()
    
    +----+-----+---------------+
    |name|value|rolling_average|
    +----+-----+---------------+
    | ABC|   10|           null|
    | ABC|   20|           null|
    | ABC|   30|           15.0|
    | ABC|   40|           25.0|
    | ABC|   50|           35.0|
    | ABC|   60|           45.0|
    | ABC|   70|           55.0|
    | ABC|   80|           65.0|
    | ABC|   90|           75.0|
    | ABC|  100|           85.0|
    +----+-----+---------------+
    

    【讨论】:

      猜你喜欢
      • 2017-02-24
      • 1970-01-01
      • 2018-10-01
      • 2020-06-12
      • 2021-08-26
      • 2021-06-21
      • 2020-07-24
      • 1970-01-01
      • 2019-02-13
      相关资源
      最近更新 更多