【问题标题】:Spark Window function using more than one column使用多列的 Spark Window 函数
【发布时间】:2019-07-31 02:17:37
【问题描述】:

我有这个数据框显示每个用户的发送时间和打开时间:

val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
             ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),             
             ("user2", "2018-04-05 18:00:00", null),
             ("user2", "2018-04-05 19:00:00", null)              
            ).toDF("id", "sendTime", "openTime")

+-----+-------------------+-------------------+
|   id|           sendTime|           openTime|
+-----+-------------------+-------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
|user2|2018-04-05 18:00:00|               null|
|user2|2018-04-05 19:00:00|               null|
+-----+-------------------+-------------------+

现在我想计算从每个用户的每次发送时间开始的过去两个小时内发生的打开次数。我使用窗口函数按用户进行分区,但我不知道如何比较 sendTime 列与 openTime 列中的值。结果数据框应如下所示:

+-----+-------------------+-------------------+-----+
|   id|           sendTime|           openTime|count|
+-----+-------------------+-------------------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|    2|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|    2|
|user2|2018-04-05 18:00:00|               null|    3|
|user2|2018-04-05 19:00:00|               null|    2|
+-----+-------------------+-------------------+-----+

这是我所能得到的,但没有给我我需要的东西:

var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
df2 = df2.withColumn("count", F.count($"openUnix").over(w))

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    仅使用Window 函数似乎很难做到这一点,因为在尝试推导openTime 的值是否在上限@987654325 的最后2 小时内时,您无法引用sendTime 的上限@。

    spark 2.4 带来了更高阶的函数,您可以在此处阅读 (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html)。使用这些,您可以使用collect_list 函数在一个窗口内收集所有openTime,然后使用高阶函数filtersendTime 之前的两个小时之外过滤掉openTimes。最后,您可以计算列表中剩余的值,以获得您所追求的计数。这是我的代码。

    import org.apache.spark.sql.expressions.Window
    
    val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
                 ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
                 ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
                 ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
                 ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
                 ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
                 ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
                 ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),             
                 ("user2", "2018-04-05 18:00:00", null),
                 ("user2", "2018-04-05 19:00:00", null)              
                ).toDF("id", "sendTime", "openTime")
    
    var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
                .withColumn("openUnix", unix_timestamp($"openTime"))
    
    val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))
    
    df3.show(false)
    
    +-----+-------------------+-------------------+----------+----------+------------------------------------+
    |id   |sendTime           |openTime           |sendUnix  |openUnix  |opened                              |
    +-----+-------------------+-------------------+----------+----------+------------------------------------+
    |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800]                        |
    |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800]            |
    |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
    |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
    |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800]                        |
    |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800]            |
    |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
    |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
    |user2|2018-04-05 18:00:00|null               |1522947600|null      |[1522946400, 1522947000, 1522943400]|
    |user2|2018-04-05 19:00:00|null               |1522951200|null      |[1522946400, 1522947000]            |
    +-----+-------------------+-------------------+----------+----------+------------------------------------+
    
    val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
            "size(filter(opened, x -> x < sendUnix AND  x > sendUnix - 7200)) as count")
    
    df4.show(false)
    
    +-----+-------------------+-------------------+----------+----------+-----+
    |id   |sendTime           |openTime           |sendUnix  |openUnix  |count|
    +-----+-------------------+-------------------+----------+----------+-----+
    |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0    |
    |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1    |
    |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2    |
    |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2    |
    |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0    |
    |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1    |
    |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2    |
    |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1    |
    |user2|2018-04-05 18:00:00|null               |1522947600|null      |3    |
    |user2|2018-04-05 19:00:00|null               |1522951200|null      |2    |
    +-----+-------------------+-------------------+----------+----------+-----+
    

    【讨论】:

    • 看起来很棒!只有一个小问题导致从底部开始的三行计数与我发布的预期响应不同。这是由于我的代码中的rangeBetween(-2*60*60, 0),显然您也使用过。这导致仅包括当前发送时间前两个小时的发送,而我们需要查看所有发送时间并只查看两个小时前的打开。如果您删除 rangeBetween(-2*60*60, 0) 我认为我们会得到预期的结果。
    【解决方案2】:

    给你。解决问题的代码

    val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))
    
        val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)
    
        var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))
    
        var df3 = df2.select('*, explode('list).as("prevTimeStamp"))
    
        df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show
    

    如果解决了请采纳。

    【讨论】:

      猜你喜欢
      • 2017-06-08
      • 1970-01-01
      • 2017-12-18
      • 2021-08-29
      • 1970-01-01
      • 2018-11-06
      • 2019-10-31
      • 1970-01-01
      • 2021-06-14
      相关资源
      最近更新 更多