【问题标题】:Pyspark window function to calculate number of transits between stopsPyspark 窗口函数用于计算停靠点之间的中转次数
【发布时间】:2021-11-07 13:38:51
【问题描述】:

我正在使用Pyspark,我想创建一个执行以下操作的函数:

给定描述火车用户交易的数据:

+----+-------------------+-------+---------------+-------------+-------------+
|USER|       DATE        |LINE_ID|      STOP     | TOPOLOGY_ID |TRANSPORT_ID |
+------------------------+-------+---------------+-------------+-------------+
|John|2021-01-27 07:27:34|      7| King Cross    |       171235|       03    |
|John|2021-01-27 07:28:00|     40| White Chapell |       123582|       03    |  
|John|2021-01-27 07:35:30|      4| Reaven        |       171565|       03    |  
|Tom |2021-01-27 07:27:23|      7| King Cross    |       171235|       03    |    
|Tom |2021-01-27 07:28:30|     40| White Chapell |       123582|       03    |                   
+----+-------------------+-------+---------------+-------------+-------------+

我想获得在 30 分钟内完成 A-B、B-C 等停靠点组合的次数。

因此,假设用户“John”在 7:27 从停靠站“King Cross”前往“White Chapell”,然后在 7:35 从“White Chapell”前往“Reaven”。
与此同时,“Tom”在 7:27 从“King Cross”前往“White Chapell”,然后在 7:32 从“White Chapell”前往“Oxford Circus”。

操作的结果应该是这样的:

+----------------------+-----------------+---------------+-----------+
|          DATE        |   ORIG_STOP     |   DEST_STOP   | NUM_TRANS |
+----------------------+-----------------+---------------+-----------+
|   2021-01-27 07:00:00|  King Cross     | White Chapell |       2   |
|   2021-01-27 07:30:00|  White Chapell  | Reaven        |       1   |              
+----------------------+-----------------+---------------+-----------+

我尝试过使用窗口函数,但无法获得我真正想要的。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql window-functions


    【解决方案1】:

    您可以尝试运行以下命令

    使用 Spark SQL

    在第一个 CTE initial_stop_groups 中,它使用 LEAD 函数确定相关的 ORIGIN 和 DESTINATION 停止和时间。下一个 CTE stop_groups,使用 CASE 表达式和日期函数确定相关的 30 分钟间隔,并过滤掉非组(即没有停止目的地)。最后的投影然后使用 group by 来聚合时间间隔、始发地和目的地组,以计算在相同 30 分钟间隔内产生的 NUM_TRANS

    假设你的数据在input_df

    input_df.createOrReplaceTempView("input_df")
    
    output_df = sparkSession.sql("""
     WITH initial_stop_groups AS (
            SELECT
                DATE as ORIG_DATE,
                LEAD(DATE) OVER (
                    PARTITION BY USER,TRANSPORT_ID
                    ORDER BY DATE
                ) as STOP_DATE,
                STOP as ORIG_STOP,
                LEAD(STOP) OVER (
                    PARTITION BY USER,TRANSPORT_ID
                    ORDER BY DATE
                ) as DEST_STOP
            FROM
                input_df
        ),
        stop_groups AS (
            SELECT 
                CAST(CONCAT(
                  CAST(ORIG_DATE as DATE),
                  ' ',
                  hour(ORIG_DATE),
                  ':',
                  CASE WHEN minute(ORIG_DATE) < 30 THEN '00' ELSE '30' END,
                  ':00'
                ) AS TIMESTAMP) as ORIG_TIME,
                CASE WHEN STOP_DATE IS NOT NULL THEN CAST(CONCAT(
                  CAST(STOP_DATE as DATE),
                  ' ',
                  hour(STOP_DATE),
                  ':',
                  CASE WHEN minute(STOP_DATE) < 30 THEN '00' ELSE '30' END,
                  ':00'
                ) AS TIMESTAMP) ELSE NULL END as STOP_TIME,
                ORIG_STOP,
                DEST_STOP
            FROM 
                initial_stop_groups
            WHERE
                DEST_STOP IS NOT NULL
        )
        SELECT
            STOP_TIME as DATE, 
            ORIG_STOP,
            DEST_STOP,
            COUNT(1) as NUM_TRANS
        FROM
            stop_groups
        WHERE
            (unix_timestamp(STOP_TIME) - unix_timestamp(ORIG_TIME)) <=30*60
            
        GROUP BY
            STOP_TIME, ORIG_STOP, DEST_STOP;
        
    """)
    
    output_df.show()
    
    DATE orig_stop dest_stop num_trans
    2021-01-27T07:00:00.000Z King Cross White Chapell 2
    2021-01-27T07:30:00.000Z White Chapell Reaven 1

    View on DB Fiddle

    • CAST((STOP_TIME - ORIG_TIME) as STRING) IN ('0 seconds','30 minutes') (unix_timestamp(STOP_TIME) - unix_timestamp(ORIG_TIME)) &lt;=30*60 取代

    使用火花 API

    实际代码

    from pyspark.sql import functions as F
    from pyspark.sql import Window
    
    next_stop_window = Window().partitionBy("USER","TRANSPORT_ID").orderBy("DATE")
    
    output_df = (
        input_df.select(
            F.col("DATE").alias("ORIG_DATE"),
            F.lead("DATE").over(next_stop_window).alias("STOP_DATE"),
            F.col("STOP").alias("ORIG_STOP"),
            F.lead("STOP").over(next_stop_window).alias("DEST_STOP"),
        ).where(
            F.col("DEST_STOP").isNotNull()
        ).select(
            F.concat(
                F.col("ORIG_DATE").cast("DATE"),
                F.lit(' '),
                F.hour("ORIG_DATE"),
                F.lit(':'),
                F.when(
                    F.minute("ORIG_DATE") < 30, '00'
                ).otherwise('30'),
                F.lit(':00')
            ).cast("TIMESTAMP").alias("ORIG_TIME"),
            F.concat(
                F.col("STOP_DATE").cast("DATE"),
                F.lit(' '),
                F.hour("STOP_DATE"),
                F.lit(':'),
                F.when(
                    F.minute("STOP_DATE") < 30, '00'
                ).otherwise('30'),
                F.lit(':00')
            ).cast("TIMESTAMP").alias("STOP_TIME"),
            F.col("ORIG_STOP"),
            F.col("DEST_STOP")
        ).where(
            (F.unix_timestamp("STOP_TIME") - F.unix_timestamp("ORIG_TIME")) <= 30*60
            # (F.col("STOP_TIME")-F.col("ORIG_TIME")).cast("STRING").isin(['0 seconds','30 minutes'])
        ).groupBy(
            F.col("STOP_TIME"),
            F.col("ORIG_STOP"),
            F.col("DEST_STOP"),
        ).count().select(
            F.col("STOP_TIME").alias("DATE"),
            F.col("ORIG_STOP"),
            F.col("DEST_STOP"),
            F.col("count").alias("NUM_TRANS"),
        )
        
    )
    output_df.show()
    
    
    DATE orig_stop dest_stop num_trans
    2021-01-27T07:00:00.000Z King Cross White Chapell 2
    2021-01-27T07:30:00.000Z White Chapell Reaven 1

    结果架构

    output_df.printSchema()
    
    root
     |-- DATE: timestamp (nullable = true)
     |-- ORIG_STOP: string (nullable = true)
     |-- DEST_STOP: string (nullable = true)
     |-- NUM_TRANS: long (nullable = false)
    

    重复性设置代码

    data="""+----+-------------------+-------+---------------+-------------+-------------+
    |USER|       DATE        |LINE_ID|      STOP     | TOPOLOGY_ID |TRANSPORT_ID |
    +------------------------+-------+---------------+-------------+-------------+
    |John|2021-01-27 07:27:34|      7| King Cross    |       171235|       03    |
    |John|2021-01-27 07:28:00|     40| White Chapell |       123582|       03    |  
    |John|2021-01-27 07:35:30|      4| Reaven        |       171565|       03    |  
    |Tom |2021-01-27 07:27:23|      7| King Cross    |       171235|       03    |    
    |Tom |2021-01-27 07:28:30|     40| White Chapell |       123582|       03    |                   
    +----+-------------------+-------+---------------+-------------+-------------+
    """
    
    rows = [ [ pc.strip() for pc in line.strip().split("|")[1:-1]] for line in data.strip().split("\n")[3:-1]]
    headers = [pc.strip() for pc in data.strip().split("\n")[1].split("|")[1:-1]]
    
    from pyspark.sql import functions as F
    input_df = sparkSession.createDataFrame(rows,schema=headers)
    input_df = input_df.withColumn("DATE",F.col("DATE").cast("TIMESTAMP"))
    
    
    

    让我知道这是否适合你。

    【讨论】:

    • 看起来差不多!我不确定的一件事是您没有使用时间戳作为日期类型,对吧?
    • @tomruarol 我正在为DATE 使用时间戳类型。我已经更新了答案(包括一个完整的 pyspark api 示例)以显示设置代码(我假设 DATE 是时间戳并按此方式转换)以及将数据类型显示为时间戳的输出模式。让我知道这是否适合您。
    • @ggordon7 答案相当准确,但由于某种原因表现不佳。我试图重现代码,但日期差异会由于转换而导致错误。它们应该被转换为 unix_timestamp 以计算它们之间的差异,但如果你转换它们,它会输出一个空的数据帧。
    • @tomruarol 我在 pyspark 3.1.2 上对此进行了测试。你用的是什么版本?如果这是您发现问题的地方,您可以在 where 子句中尝试 (F.unix_timestamp("STOP_TIME") - F.unix_timestamp("ORIG_TIME")) &lt;= 30*60 而不是 (F.col("STOP_TIME")-F.col("ORIG_TIME")).cast("STRING").isin(['0 seconds','30 minutes'])
    • @ggordon7 我使用的是 Spark 2.4.7。我尝试使用到 unix_time 的转换,但我在某个地方搞砸了。现在它就像一个魅力!非常感谢!!
    猜你喜欢
    • 2022-08-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-03
    • 2020-06-23
    • 2017-06-29
    • 2019-04-23
    相关资源
    最近更新 更多