【问题标题】:Using Windows to group 5 mins timeframe使用 Windows 分组 5 分钟时间范围
【发布时间】:2020-04-25 23:17:41
【问题描述】:

csv 文件是:

#+----+-----------+-------------------+
#|col1|       col2|          timestamp|
#+----+-----------+-------------------+
#|   0|Town Street|01-02-2017 06:01:00|
#|   0|Town Street|01-02-2017 06:03:00|
#|   0|Town Street|01-02-2017 06:05:00|
#|   0|Town Street|01-02-2017 06:06:00|
#|   0|Town Street|02-02-2017 10:01:00|
#|   0|Town Street|02-02-2017 10:05:00|
#+----+-----------+-------------------+

比较每个日期的时间,看看是否有 5 分钟的差异,如果他们是数他们

输出:

 #+----+-----------+-------------------+
#|col1|       col2|          timestamp|
#+----+-----------+-------------------+
#|   0|Town Street|01-02-2017 06:01:00|
#|   0|Town Street|01-02-2017 06:03:00|
#|   0|Town Street|01-02-2017 06:05:00|
#|   0|Town Street|01-02-2017 06:06:00|
#|   0|Town Street|02-02-2017 10:01:00|
#|   0|Town Street|02-02-2017 10:05:00|
#+----+-----------+-------------------+

立即编码:

from pyspark.sql import SQLContext
import pyspark.sql.functions as F

    def my_main(sc, my_dataset_dir):
        sqlContext = SQLContext(sc)
        df = sqlContext.read.csv(my_dataset_dir,sep=';').rdd.zipWithIndex().filter(lambda x: x[1] > 1).map(lambda x: x[0]).toDF(['status','title','datetime'])

这段代码只给出了 5 分钟窗口的空结果。

【问题讨论】:

    标签: python csv pyspark windowing


    【解决方案1】:

    不确定这是否正是您想要的,但它应该将您推向正确的方向。您可以将时间戳转换为 timestamptypedatetype。在 seconds(300) 中创建 windowpartitionBy 日期和 rangebetween 时间戳。

    #df.show()
    #sampledataframe
    #+----+-----------+-------------------+
    #|col1|       col2|          timestamp|
    #+----+-----------+-------------------+
    #|   0|Town Street|01-02-2017 06:01:00|
    #|   0|Town Street|01-02-2017 06:03:00|
    #|   0|Town Street|01-02-2017 06:05:00|
    #|   0|Town Street|01-02-2017 06:06:00|
    #|   0|Town Street|02-02-2017 10:01:00|
    #|   0|Town Street|02-02-2017 10:05:00|
    #+----+-----------+-------------------+
    
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    w=Window().partitionBy("date").orderBy(F.col("timestamp").cast("long")).rangeBetween(Window.currentRow,60*5)
    
    df.withColumn("timestamp", F.to_timestamp("timestamp",'MM-dd-yyyy HH:mm:ss'))\
      .withColumn("date", F.to_date("timestamp"))\
      .withColumn('collect', F.size(F.collect_list("timestamp").over(w))).filter("collect>1")\
      .select(F.date_format("date","yyyy-MM-dd").alias("date"), F.array(F.date_format("timestamp","HH:mm:ss"),F.col("collect")).alias("time"))\
      .orderBy("date").show()
    
    #+----------+-------------+
    #|      date|         time|
    #+----------+-------------+
    #|2017-01-02|[06:01:00, 4]|
    #|2017-01-02|[06:05:00, 2]|
    #|2017-01-02|[06:03:00, 3]|
    #|2017-02-02|[10:01:00, 2]|
    #+----------+-------------+
    

    【讨论】:

    • 代码似乎没有计数,我有一个更长的测试集,即使组大于 2,它也只会将计数设为 2。
    • 我添加了不同的样本数据,在我的集群中,它每 5 分钟计算一次计数。仔细检查代码和你的输出。
    猜你喜欢
    • 2011-05-19
    • 1970-01-01
    • 2017-10-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-11
    相关资源
    最近更新 更多