【问题标题】:Spark window function with condition on current row具有当前行条件的 Spark 窗口函数
【发布时间】:2018-10-20 05:29:08
【问题描述】:

我正在尝试计算给定的order_id 在过去 365 天内有多少订单已付款。这不是问题所在:我使用window function

对我来说比较棘手的地方是:我不想在当前order_idorder_date 之后的payment_date 之后计算订单。

目前,我有这样的事情:

val window: WindowSpec = Window
  .partitionBy("customer_id")
  .orderBy("order_date")
  .rangeBetween(-365*days, -1)

df.withColumn("paid_order_count", count("*") over window)

这将计算客户当前订单前 365 天内的所有订单。

我现在如何合并一个将当前订单的order_date 考虑在内的计数条件?

例子:

+---------+-----------+-------------+------------+
|order_id |order_date |payment_date |customer_id |
+---------+-----------+-------------+------------+
|1        |2017-01-01 |2017-01-10   |A           |
|2        |2017-02-01 |2017-02-10   |A           |
|3        |2017-02-02 |2017-02-20   |A           |

结果表应如下所示:

+---------+-----------+-------------+------------+-----------------+
|order_id |order_date |payment_date |customer_id |paid_order_count |
+---------+-----------+-------------+------------+-----------------+
|1        |2017-01-01 |2017-01-10   |A           |0                |
|2        |2017-02-01 |2017-02-10   |A           |1                |
|3        |2017-02-02 |2017-02-20   |A           |1                |

对于order_id = 3paid_order_count 不应该是2,而是1,因为order_id = 2 是在放置order_id = 3 之后支付的。

希望我能很好地解释我的问题,期待你的想法!

【问题讨论】:

  • 所以你只想要每个订单日期的累积计数基于付款日期?

标签: scala apache-spark apache-spark-sql


【解决方案1】:

很好的问题!!! 有几点说明,使用 rangeBetween 创建一个基于行数而不是值的固定框架,因此在 2 种情况下会出现问题:

  1. 客户并非每天都有订单,因此 365 行窗口可能包含一年前的 order_date 行
  2. 如果客户每天有超过一个订单,则会影响一年的保修期
  3. 1和2的组合

rangeBetween 也不适用于 Date 和 Timestamp 数据类型。

为了解决这个问题,可以使用带有列表和 UDF 的窗口函数:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

  val df = spark.sparkContext.parallelize(Seq(
    (1, "2017-01-01", "2017-01-10", "A")
    , (2, "2017-02-01", "2017-02-10", "A")
    , (3, "2017-02-02", "2017-02-20", "A")
  )
  ).toDF("order_id", "order_date", "payment_date", "customer_id")
    .withColumn("order_date_ts", to_timestamp($"order_date", "yyyy-MM-dd").cast("long"))
    .withColumn("payment_date_ts", to_timestamp($"payment_date", "yyyy-MM-dd").cast("long"))

//      df.printSchema()
//      df.show(false)

  val window = Window.partitionBy("customer_id").orderBy("order_date_ts").rangeBetween(Window.unboundedPreceding, -1)

  val count_filtered_dates = udf( (days: Int, top: Long, array: Seq[Long]) => {
      val bottom = top - (days * 60 * 60 * 24).toLong // in spark timestamps are in secconds, calculating the date days ago
      array.count(v => v >= bottom && v < top)
    }
  )

  val res = df.withColumn("paid_orders", collect_list("payment_date_ts") over window)
      .withColumn("paid_order_count", count_filtered_dates(lit(365), $"order_date_ts", $"paid_orders"))

  res.show(false)

输出:

+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|order_id|order_date|payment_date|customer_id|order_date_ts|payment_date_ts|paid_orders             |paid_order_count|
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|1       |2017-01-01|2017-01-10  |A          |1483228800   |1484006400     |[]                      |0               |
|2       |2017-02-01|2017-02-10  |A          |1485907200   |1486684800     |[1484006400]            |1               |
|3       |2017-02-02|2017-02-20  |A          |1485993600   |1487548800     |[1484006400, 1486684800]|1               |
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+

在几秒钟内将日期转换为 Spark 时间戳可以提高列表的内存效率。

这是最容易实现的代码,但不是最佳代码,因为列表会占用一些内存,自定义 UDAF 是最好的,但需要更多编码,以后可能会这样做。如果每个客户有数千个订单,这仍然有效。

【讨论】:

    猜你喜欢
    • 2017-07-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多