【问题标题】:pyspark sql Add different Qtr start_date, End_date for exploded rowspyspark sql 为展开的行添加不同的 Qtr start_date、End_date
【发布时间】:2020-10-02 06:27:27
【问题描述】:

我有一个包含 start_date、end_date、sales_target 的数据框。我添加了代码来识别日期范围之间的季度数,因此能够使用一些 UDF 将 sales_target 拆分为季度数。

df = sqlContext.createDataFrame([("2020-01-01","2020-12-31","15"),("2020-04-01","2020-12-31","11"),("2020-07-01","2020-12-31","3")], ["start_date","end_date","sales_target"])

+----------+----------+------------+
|start_date| end_date |sales_target|
+----------+----------+------------+
|2020-01-01|2020-12-31|          15|
|2020-04-01|2020-12-31|          11|
|2020-07-01|2020-12-31|           3|
+----------+----------+------------+

以下是计算季度数后的数据框,并使用 UDF 函数拆分 sales_target。

spark.sql('select *, round(months_between(end_date, start_date)/3) as noq from df_temp').createOrReplaceTempView("df_temp")
spark.sql("select *, st_udf(cast(sales_target as integer), cast(noq as integer)) as sales_target from df_temp").createOrReplaceTempView("df_temp")
+----------+----------+--------+---------------+ 
|start_date| end_date |num_qtrs|sales_target_n | 
+----------+----------+--------+---------------+ 
|2020-01-01|2020-12-31|       4| [4,4,4,3]     | 
|2020-04-01|2020-12-31|       3| [4,4,3]       | 
|2020-07-01|2020-12-31|       2| [2,1]         | 
+----------+----------+--------+---------------+

分解 sales_target 后,我​​能够得到以下结果:

+----------+----------+--------+-------------+---------------+------------------+
|start_date| end_date |num_qtrs|sales_target |sales_target_n | sales_target_new | 
+----------+----------+--------+-------------+---------------+------------------+  
|2020-01-01|2020-12-31|       4|        15   |  [4,4,4,3]    | 4                |
|2020-01-01|2020-12-31|       4|        15   |  [4,4,4,3]    | 4                |
|2020-01-01|2020-12-31|       4|        15   |  [4,4,4,3]    | 4                |
|2020-01-01|2020-12-31|       4|        15   |  [4,4,4,3]    | 3                |
|2020-04-01|2020-12-31|       3|        11   |  [4,4,3]      | 4                | 
|2020-04-01|2020-12-31|       3|        11   |  [4,4,3]      | 4                |
|2020-04-01|2020-12-31|       3|        11   |  [4,4,3]      | 3                |
|2020-07-01|2020-12-31|       2|         3   |  [2,1]        | 2                |
|2020-07-01|2020-12-31|       2|         3   |  [2,1]        | 1                |
+----------+----------+--------+-------------+---------------+------------------+

我需要帮助来根据 num_qtrs 值为每行添加不同的开始/结束日期。我需要得到一个如下的数据框。

+----------+----------+--------+-------------+------------------+--------------+--------------+
|start_date| end_date |num_qtrs|sales_target | sales_target_new |new_start_date| new_end_date | 
+----------+----------+--------+-------------+------------------+--------------+--------------+  
|2020-01-01|2020-12-31|       4| [4,4,4,3]   | 4                |2020-01-01    |2020-03-31    |
|2020-01-01|2020-12-31|       4| [4,4,4,3]   | 4                |2020-04-01    |2020-06-30    |
|2020-01-01|2020-12-31|       4| [4,4,4,3]   | 4                |2020-07-01    |2020-09-30    |
|2020-01-01|2020-12-31|       4| [4,4,4,3]   | 3                |2020-10-01    |2020-12-31    |
|2020-04-01|2020-12-31|       3| [4,4,3]     | 4                |2020-04-01    |2020-06-30    | 
|2020-04-01|2020-12-31|       3| [4,4,3]     | 4                |2020-07-01    |2020-09-30    |
|2020-04-01|2020-12-31|       3| [4,4,3]     | 3                |2020-10-01    |2020-12-31    |
|2020-07-01|2020-12-31|       2| [2,1]       | 2                |2020-07-01    |2020-09-30    |
|2020-07-01|2020-12-31|       2| [2,1]       | 1                |2020-10-01    |2020-12-31    |
+----------+----------+--------+-------------+------------------+--------------+--------------+ 

有人可以帮助我使用 pyspark 代码示例以实现上述预期结果。

更新序列错误: 谢谢

【问题讨论】:

  • 你的 spark 版本是什么?
  • 是的,它的 databricks 运行时 6.3 与 apache spark 2.4.4 和 python 3.73。谢谢。

标签: python sql pyspark apache-spark-sql mysql-python


【解决方案1】:

试试这个-

需要start_dateend_date来计算new_start_datenew_end_date

加载提供的测试数据

   val data =
      """
        |start_date| end_date |sales_target
        |2020-01-01|2020-12-31|          15
        |2020-04-01|2020-12-31|          11
        |2020-07-01|2020-12-31|           3
      """.stripMargin

    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()
    /**
      * +-------------------+-------------------+------------+
      * |start_date         |end_date           |sales_target|
      * +-------------------+-------------------+------------+
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |
      * |2020-04-01 00:00:00|2020-12-31 00:00:00|11          |
      * |2020-07-01 00:00:00|2020-12-31 00:00:00|3           |
      * +-------------------+-------------------+------------+
      *
      * root
      * |-- start_date: timestamp (nullable = true)
      * |-- end_date: timestamp (nullable = true)
      * |-- sales_target: integer (nullable = true)
      */

计算new_start_datenew_end_date

 val processedDF = df.withColumn("new_start_date", explode(sequence(to_date($"start_date"), to_date($"end_date"),
      expr("interval 3 month"))))
      .withColumn("new_end_date",
        date_sub(coalesce(lead("new_start_date", 1)
          .over(Window.partitionBy("start_date").orderBy("new_start_date")), to_date($"end_date")), 1)
      )

    processedDF.orderBy("start_date", "new_start_date").show(false)
    processedDF.printSchema()

    /**
      * +-------------------+-------------------+------------+--------------+------------+
      * |start_date         |end_date           |sales_target|new_start_date|new_end_date|
      * +-------------------+-------------------+------------+--------------+------------+
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |2020-01-01    |2020-03-31  |
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |2020-04-01    |2020-06-30  |
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |2020-07-01    |2020-09-30  |
      * |2020-01-01 00:00:00|2020-12-31 00:00:00|15          |2020-10-01    |2020-12-30  |
      * |2020-04-01 00:00:00|2020-12-31 00:00:00|11          |2020-04-01    |2020-06-30  |
      * |2020-04-01 00:00:00|2020-12-31 00:00:00|11          |2020-07-01    |2020-09-30  |
      * |2020-04-01 00:00:00|2020-12-31 00:00:00|11          |2020-10-01    |2020-12-30  |
      * |2020-07-01 00:00:00|2020-12-31 00:00:00|3           |2020-07-01    |2020-09-30  |
      * |2020-07-01 00:00:00|2020-12-31 00:00:00|3           |2020-10-01    |2020-12-30  |
      * +-------------------+-------------------+------------+--------------+------------+
      *
      * root
      * |-- start_date: timestamp (nullable = true)
      * |-- end_date: timestamp (nullable = true)
      * |-- sales_target: integer (nullable = true)
      * |-- new_start_date: date (nullable = false)
      * |-- new_end_date: date (nullable = true)
      */

【讨论】:

  • 谢谢你试试这个 Someshwar。希望这适用于 pyspark,以及我在示例中显示的数据框,因为我的数据框中有重复的开始日期/结束日期。
  • 我收到错误 Someshwar,未定义名称序列。你能帮忙吗
  • 您使用的是哪个版本的 spark?
  • 它的 databricks 运行时 6.3 与 apache spark 2.4.4 和 python 3.73。在我的问题中添加了错误消息的屏幕截图。谢谢
  • 您是否从pyspark.sql.functions 导入了所有函数。检查文档,序列在 2.4.4 spark.apache.org/docs/2.4.4/api/python/…
【解决方案2】:
package spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object Qvartal extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  val dataDF = Seq(
    ("2020-01-01", "2020-12-31", 4, List(4,4,4,3)),
    ("2020-04-01", "2020-12-31", 3, List(4,4,3)),
    ("2020-07-01", "2020-12-31", 2, List(2,1))
    ).toDF("start_date", "end_date", "num_qtrs", "sales_target_n")

  val listStartEnd = udf((b: String, e: String) => {
    List("2020-01-01", "2020-04-01", "2020-07-01", "2020-10-01").filter(i => i >= b && i <= e)
      .zip(List("2020-03-31", "2020-06-30", "2020-09-30", "2020-12-31").filter(i => i >= b && i <= e))
  })

  val resDF = dataDF.withColumn("new_start_end_date", lit(listStartEnd('start_date, 'end_date)))

  resDF.show(false)
//  +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
//  |start_date|end_date  |num_qtrs|sales_target_n|new_start_end_date                                                                                      |
//  +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
//  |2020-01-01|2020-12-31|4       |[4, 4, 4, 3]  |[[2020-01-01, 2020-03-31], [2020-04-01, 2020-06-30], [2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]]|
//  |2020-04-01|2020-12-31|3       |[4, 4, 3]     |[[2020-04-01, 2020-06-30], [2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]]                          |
//  |2020-07-01|2020-12-31|2       |[2, 1]        |[[2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]]                                                    |
//  +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+

  val r11 = resDF
    .withColumn("sales_target_n1", explode('sales_target_n))
    .withColumn("monotonically_increasing_id", monotonically_increasing_id())

  val r12 = r11.select(
    'monotonically_increasing_id,
    'start_date,
    'end_date,
    'num_qtrs,
    'sales_target_n1
  )

  val r21 = resDF
    .withColumn("new_start_end_date_1", explode('new_start_end_date))
    .withColumn("monotonically_increasing_id", monotonically_increasing_id())

  val r22 = r21.select(
    'monotonically_increasing_id,
    'start_date,
    'end_date,
    'num_qtrs,
    'new_start_end_date_1
  )

  val resultDF = r12.join(r22,
    r22.col("monotonically_increasing_id") === r12.col("monotonically_increasing_id"),
  "inner")
    .select(
      r12.col("start_date"),
      r12.col("end_date"),
      r12.col("num_qtrs"),
      r12.col("sales_target_n1").alias("sales_target_n"),
      r22.col("new_start_end_date_1")
    )
    .withColumn("new_start_date", col("new_start_end_date_1").getItem("_1"))
    .withColumn("new_end_date", col("new_start_end_date_1").getItem("_2"))
    .drop("new_start_end_date_1")

  resultDF.show(false)
  //      +----------+----------+--------+--------------+--------------+------------+
  //      |start_date|end_date  |num_qtrs|sales_target_n|new_start_date|new_end_date|
  //      +----------+----------+--------+--------------+--------------+------------+
  //      |2020-01-01|2020-12-31|4       |4             |2020-01-01    |2020-03-31  |
  //      |2020-01-01|2020-12-31|4       |4             |2020-04-01    |2020-06-30  |
  //      |2020-01-01|2020-12-31|4       |4             |2020-07-01    |2020-09-30  |
  //      |2020-01-01|2020-12-31|4       |3             |2020-10-01    |2020-12-31  |
  //      |2020-04-01|2020-12-31|3       |4             |2020-04-01    |2020-06-30  |
  //      |2020-04-01|2020-12-31|3       |4             |2020-07-01    |2020-09-30  |
  //      |2020-04-01|2020-12-31|3       |3             |2020-10-01    |2020-12-31  |
  //      |2020-07-01|2020-12-31|2       |2             |2020-07-01    |2020-09-30  |
  //      |2020-07-01|2020-12-31|2       |1             |2020-10-01    |2020-12-31  |
  //      +----------+----------+--------+--------------+--------------+------------+

}

【讨论】:

  • 我如何分解这些列以作为具有不同开始日期/结束日期的单独行插入?你能帮我吗,因为我无法得到它。比
  • 添加到我的答案中,请爆炸,请参阅 - @Yuva
  • 是的,我明白了,你能告诉我这里的 listStartEnd 是什么吗,希望你不难设置季度的开始和结束日期。我希望它基于 START DATE 和 END DATE 列,因为可以是任何年份,而不仅仅是 2020 年。如果你能确认我会试试这个。谢谢。
【解决方案3】:

在应用您的 UDF 后,将以下视为您的输入数据框。

输入:

+----------+----------+--------+--------------+
|start_date|  end_date|num_qtrs|sales_target_n|
+----------+----------+--------+--------------+
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|
|2020-04-01|2020-12-31|       3|     [4, 4, 3]|
|2020-07-01|2020-12-31|       2|        [2, 1]|
+----------+----------+--------+--------------+

您可以使用row_numberadd_monthsdate_add的组合来获得您想要的输出,如下所示,

from pyspark.sql.functions import explode, row_number, expr
from pyspark.sql import Window

window = Window.partitionBy('start_date').orderBy(desc("sales_target_new"))

df.withColumn('sales_target_new', explode('sales_target_n')).\
withColumn('row_num', row_number().over(window)).\
withColumn('new_start_date', expr("add_months(start_date, (row_num-1) * 3)")).\
withColumn('new_end_date', expr("add_months(date_add(start_date, -1), row_num * 3)")).\
orderBy('start_date', 'row_num').show()

输出:

+----------+----------+--------+--------------+----------------+-------+--------------+------------+
|start_date|  end_date|num_qtrs|sales_target_n|sales_target_new|row_num|new_start_date|new_end_date|
+----------+----------+--------+--------------+----------------+-------+--------------+------------+
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|               4|      1|    2020-01-01|  2020-03-31|
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|               4|      2|    2020-04-01|  2020-06-30|
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|               4|      3|    2020-07-01|  2020-09-30|
|2020-01-01|2020-12-31|       4|  [4, 4, 4, 3]|               3|      4|    2020-10-01|  2020-12-31|
|2020-04-01|2020-12-31|       3|     [4, 4, 3]|               4|      1|    2020-04-01|  2020-06-30|
|2020-04-01|2020-12-31|       3|     [4, 4, 3]|               4|      2|    2020-07-01|  2020-09-30|
|2020-04-01|2020-12-31|       3|     [4, 4, 3]|               3|      3|    2020-10-01|  2020-12-31|
|2020-07-01|2020-12-31|       2|        [2, 1]|               2|      1|    2020-07-01|  2020-09-30|
|2020-07-01|2020-12-31|       2|        [2, 1]|               1|      2|    2020-10-01|  2020-12-31|
+----------+----------+--------+--------------+----------------+-------+--------------+------------+

您可以根据自己的要求修改window

【讨论】:

  • 谢谢 noufel13,这是我收到的最好的,并且已经接受了答案。
猜你喜欢
  • 2021-05-19
  • 2021-09-27
  • 2019-05-17
  • 1970-01-01
  • 2021-06-15
  • 2020-09-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多