【问题标题】:Spark SQL - How to find total number of transactions on an hourly basisSpark SQL - 如何按小时查找事务总数
【发布时间】:2016-11-14 10:31:36
【问题描述】:

例如,如果我有一个包含transaction numbertransaction date [as timestamp] 列的表,我如何找出hourly basis 上的事务总数?

是否有任何 Spark sql 函数可用于这种范围计算?

【问题讨论】:

    标签: apache-spark-sql spark-dataframe


    【解决方案1】:

    您可以使用from_unixtime 函数。

    val sqlContext = new SQLContext(sc)
    
    import org.apache.spark.sql.functions._
    import sqlContext.implicits._
    
    val df = // your dataframe, assuming transaction_date is timestamp in seconds
    df.select('transaction_number, hour(from_unixtime('transaction_date)) as 'hour)
          .groupBy('hour)
          .agg(count('transaction_number) as 'transactions)
    

    结果:

    +----+------------+
    |hour|transactions|
    +----+------------+
    |  10|        1000|
    |  12|        2000|
    |  13|        3000|
    |  14|        4000|
    |  ..|        ....|
    +----+------------+
    

    【讨论】:

      【解决方案2】:

      这里我试图给出一些方法的指针,而不是完整的代码,请看这个

      Time Interval Literals : Using interval literals, it is possible to perform subtraction or addition of an arbitrary amount of time from a date or timestamp value. This representation can be useful when you want to add or subtract a time period from a fixed point in time. For example, users can now easily express queries like “Find all transactions that have happened during the past hour”. An interval literal is constructed using the following syntax: [sql]INTERVAL value unit[/sql]


      下面是python中的方法。您可以修改以下示例以匹配您的要求,即相应的交易日期开始时间和结束时间。而不是 id 在你的情况下它的交易号。

      # Import functions.
      from pyspark.sql.functions import *
      # Create a simple DataFrame.
      data = [
        ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1),
        ("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2),
        ("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3)]
      df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
      df = df.select(
        df.start_time.cast("timestamp").alias("start_time"),
        df.end_time.cast("timestamp").alias("end_time"),
        df.id)
      # Get all records that have a start_time and end_time in the
      # same day, and the difference between the end_time and start_time
      # is less or equal to 1 hour.
      condition = \
        (to_date(df.start_time) == to_date(df.end_time)) & \
        (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time)
      df.filter(condition).show()
      +———————+———————+—+
      |start_time           |            end_time |id |
      +———————+———————+—+
      |2015-01-02 23:00:00.0|2015-01-02 23:59:59.0|2  |
      +———————+———————+—+
      

      使用此方法,您可以应用分组功能来查找您的案例中的交易总数。

      上面是python代码,那么scala呢?

      上面使用的expr function 也可以在 scala 中使用

      也可以看看spark-scala-datediff-of-two-columns-by-hour-or-minute 下面描述..

      import org.apache.spark.sql.functions._
          val diff_secs_col = col("ts1").cast("long") - col("ts2").cast("long")
          val df2 = df1
            .withColumn( "diff_secs", diff_secs_col )
            .withColumn( "diff_mins", diff_secs_col / 60D )
            .withColumn( "diff_hrs",  diff_secs_col / 3600D )
            .withColumn( "diff_days", diff_secs_col / (24D * 3600D) )
      

      【讨论】:

        猜你喜欢
        • 2020-11-06
        • 1970-01-01
        • 1970-01-01
        • 2020-01-08
        • 2021-12-15
        • 1970-01-01
        • 1970-01-01
        • 2021-06-14
        • 2015-05-30
        相关资源
        最近更新 更多