【问题标题】:Calculate value based on value from same column of the previous row in spark根据spark中上一行的同一列的值计算值
【发布时间】:2019-11-21 04:11:50
【问题描述】:

我有一个问题,我必须使用一个公式来计算一列,该公式使用上一行中完成的计算中的值。

我无法使用withColumn API 解决。

我需要计算一个新列,使用公式:

MovingRate = MonthlyRate + (0.7 * MovingRatePrevious)

... 其中MovingRatePrevious 是前一行的MovingRate

对于第 1 个月,我有该值,因此我不需要重新计算该值,但我需要该值才能计算后续行。我需要按类型进行分区。

这是我的原始数据集:

MovingRate 列中的所需结果:

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    尽管可以使用寡妇函数(请参阅@Leo C 的答案),但我敢打赌,使用groupBy 在每个Type 聚合一次会更高效。然后,分解 UDF 的结果以取回所有行:

    val df = Seq(
      (1, "blue", 0.4, Some(0.33)),
      (2, "blue", 0.3, None),
      (3, "blue", 0.7, None),
      (4, "blue", 0.9, None)
    )
    .toDF("Month", "Type", "MonthlyRate", "MovingRate")
    
    // this udf produces an Seq of Tuple3 (Month, MonthlyRate, MovingRate)
    val calcMovingRate = udf((startRate:Double,rates:Seq[Row]) => rates.tail
      .scanLeft((rates.head.getInt(0),startRate,startRate))((acc,curr) => (curr.getInt(0),curr.getDouble(1),acc._3+0.7*curr.getDouble(1)))
    )
    
    df
      .groupBy($"Type")
      .agg(
        first($"MovingRate",ignoreNulls=true).as("startRate"),
        collect_list(struct($"Month",$"MonthlyRate")).as("rates")
      )
      .select($"Type",explode(calcMovingRate($"startRate",$"rates")).as("movingRates"))
      .select($"Type",$"movingRates._1".as("Month"),$"movingRates._2".as("MonthlyRate"),$"movingRates._3".as("MovingRate"))
      .show()
    

    给予:

    +----+-----+-----------+------------------+
    |Type|Month|MonthlyRate|        MovingRate|
    +----+-----+-----------+------------------+
    |blue|    1|       0.33|              0.33|
    |blue|    2|        0.3|              0.54|
    |blue|    3|        0.7|              1.03|
    |blue|    4|        0.9|1.6600000000000001|
    +----+-----+-----------+------------------+
    

    【讨论】:

      【解决方案2】:

      考虑到每个移动速率都是从前一个速率递归计算的要求的性质,面向列的 DataFrame API 不会发挥作用,尤其是在数据集很大的情况下。

      也就是说,如果数据集不大,一种方法是让 Spark 通过 UDF 逐行重新计算移动速率,并将窗口分区速率列表作为其输入:

      import org.apache.spark.sql.expressions.Window
      
      val df = Seq(
        (1, "blue", 0.4, Some(0.33)),
        (2, "blue", 0.3, None),
        (3, "blue", 0.7, None),
        (4, "blue", 0.9, None),
        (1, "red", 0.5, Some(0.2)),
        (2, "red", 0.6, None),
        (3, "red", 0.8, None)
      ).toDF("Month", "Type", "MonthlyRate", "MovingRate")
      
      val win = Window.partitionBy("Type").orderBy("Month").
        rowsBetween(Window.unboundedPreceding, 0)
      
      def movingRate(factor: Double) = udf( (initRate: Double, monthlyRates: Seq[Double]) =>
        monthlyRates.tail.foldLeft(initRate)( _ * factor + _ )
      )
      
      df.
        withColumn("MovingRate", when($"Month" === 1, $"MovingRate").otherwise(
          movingRate(0.7)(last($"MovingRate", ignoreNulls=true).over(win), collect_list($"MonthlyRate").over(win))
        )).
        show
      // +-----+----+-----------+------------------+
      // |Month|Type|MonthlyRate|        MovingRate|
      // +-----+----+-----------+------------------+
      // |    1| red|        0.5|               0.2|
      // |    2| red|        0.6|              0.74|
      // |    3| red|        0.8|             1.318|
      // |    1|blue|        0.4|              0.33|
      // |    2|blue|        0.3|0.5309999999999999|
      // |    3|blue|        0.7|1.0716999999999999|
      // |    4|blue|        0.9|1.6501899999999998|
      // +-----+----+-----------+------------------+
      

      【讨论】:

      • 我认为这不是很高效...我认为最好将所有费率集中起来,然后再爆炸
      • @Raphael Roth,这是一个很好的问题:Window partitionBygroupBy/explode 的性能,我认为答案都取决于大小写。出于好奇,我在两个解决方案之间做了一个非常粗略的时间检查,发现Window partitionBy 始终比groupBy/explode 做得好一点,比如在一个数据帧上~600ms vs ~900ms,每个数据帧有 10 种类型,每个类型有 10 万个月,随机生成率。
      【解决方案3】:

      您要做的是计算一个递归公式,如下所示:

      x[i] = y[i] + 0.7 * x[i-1]
      

      其中x[i] 是您在i 行的MovingRatey[i] 是您在i 行的MonthlyRate

      问题在于这是一个纯粹的顺序公式。每一行都需要前一行的结果,而前一行的结果又需要前一行的结果。 Spark 是一个并行计算引擎,很难使用它来加速无法真正并行化的计算。

      【讨论】:

      • 这是一个spark数据帧,不大,可以通过驱动收集,有帮助吗?
      猜你喜欢
      • 1970-01-01
      • 2022-11-11
      • 1970-01-01
      • 1970-01-01
      • 2020-08-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-19
      相关资源
      最近更新 更多