【问题标题】:How to create a PySpark Dataframe Column which iterates through a circular list如何创建遍历循环列表的 PySpark 数据框列
【发布时间】:2020-01-20 14:31:26
【问题描述】:

如何在这个循环通过循环列表的 Pyspark 数据框中添加一列,如下所示:

   df = sc.parallelize([['2019-08-29 01:00:00'],
                              ['2019-08-29 02:00:00'],
                              ['2019-08-29 03:00:00'],
                              ['2019-08-29 04:00:00'],
                              ['2019-08-29 05:00:00'],
                              ['2019-08-29 06:00:00'],
                              ['2019-08-29 07:00:00'],
                              ['2019-08-29 08:00:00'],
                              ['2019-08-29 09:00:00'],
                              ['2019-08-29 10:00:00']]).toDF(['DATETIME']).withColumn('DATETIME',col('DATETIME').cast('timestamp'))

期望的结果:

+-------------------+---+
|           DATETIME|NUM|
+-------------------+---+
|2019-08-29 01:00:00|  1|
|2019-08-29 02:00:00|  2|
|2019-08-29 03:00:00|  3|
|2019-08-29 04:00:00|  4|
|2019-08-29 05:00:00|  1|
|2019-08-29 06:00:00|  2|
|2019-08-29 07:00:00|  3|
|2019-08-29 08:00:00|  4|
|2019-08-29 09:00:00|  1|
|2019-08-29 10:00:00|  2|
+-------------------+---+

非常感谢

【问题讨论】:

    标签: python pyspark pyspark-dataframes circular-list


    【解决方案1】:

    您可以使用row_number生成id,然后使用modulus operator (%)获取旋转id:

    from pyspark.sql.functions import col
    from pyspark.sql import functions as f
    from pyspark.sql.window import Window
    
    w = Window.orderBy("DATETIME")
    
    df
    .withColumn("id", f.row_number().over(w))
    .withColumn("NUM", (col("id") % 4) + 1)
    .drop("id")
    .show(100, False)
    

    输出:

    +-------------------+---+
    |DATETIME           |NUM|
    +-------------------+---+
    |2019-08-29 01:00:00|2  |
    |2019-08-29 02:00:00|3  |
    |2019-08-29 03:00:00|4  |
    |2019-08-29 04:00:00|1  |
    |2019-08-29 05:00:00|2  |
    |2019-08-29 06:00:00|3  |
    |2019-08-29 07:00:00|4  |
    |2019-08-29 08:00:00|1  |
    |2019-08-29 09:00:00|2  |
    |2019-08-29 10:00:00|3  |
    |2019-08-29 11:00:00|4  |
    |2019-08-29 12:00:00|1  |
    |2019-08-29 13:00:00|2  |
    |2019-08-29 14:00:00|3  |
    |2019-08-29 15:00:00|4  |
    |2019-08-29 16:00:00|1  |
    |2019-08-29 17:00:00|2  |
    |2019-08-29 18:00:00|3  |
    |2019-08-29 19:00:00|4  |
    |2019-08-29 20:00:00|1  |
    |2019-08-29 21:00:00|2  |
    |2019-08-29 22:00:00|3  |
    |2019-08-29 23:00:00|4  |
    |2019-08-30 00:00:00|1  |
    |2019-08-30 01:00:00|2  |
    |2019-08-30 02:00:00|3  |
    |2019-08-30 03:00:00|4  |
    |2019-08-30 04:00:00|1  |
    +-------------------+---+
    

    【讨论】:

      【解决方案2】:

      我使用 reduce 函数处理原始数据帧(这是一个 copypastable 示例):

      from functools import reduce
      
      from pyspark import SparkContext
      from pyspark.sql import Window
      from pyspark.sql.functions import row_number
      from pyspark.sql.functions import lit
      from pyspark.sql.functions import col
      from pyspark.sql.functions import when
      
      sc = SparkContext(__file__)
      step = 4  # rows qty
      
      df = sc.parallelize([['2019-08-29 01:00:00'],
                          ['2019-08-29 02:00:00'],
                          ['2019-08-29 03:00:00'],
                          ['2019-08-29 04:00:00'],
                          ['2019-08-29 05:00:00'],
                          ['2019-08-29 06:00:00'],
                          ['2019-08-29 07:00:00'],
                          ['2019-08-29 08:00:00'],
                          ['2019-08-29 09:00:00'],
                          ['2019-08-29 10:00:00']])\
              .toDF(['DATETIME'])\
              .withColumn('DATETIME',col('DATETIME')\
              .cast('timestamp'))
      
      df = df.withColumn('tmp_num', row_number().over(Window.orderBy('DATETIME')))
      df = df.withColumn('num', lit('A'))  # temporal random value
      rows = df.count()
      idxs = list(range(1, rows, step))
      
      # Logic is here
      def operate_df(df_, idx):
          return df_.select(df_.DATETIME,
                            df_.num,
                            df_.tmp_num,
                            df_.tmp_num.between(idx, (idx + step - 1)).alias('block')
                            ).withColumn('num', when(col('block') == True, 
                                                     row_number().over(Window.partitionBy('block').orderBy('block')))
                                                     .otherwise(col('num')))
      
      result_df = reduce(operate_df, idxs, df)
      result_df = result_df.drop('tmp_num', 'block')  # drop auxiliar columns
      print(result_df.show())
      

      这给了我这个输出:

      +-------------------+---+
      |           DATETIME|num|
      +-------------------+---+
      |2019-08-29 01:00:00|  1|
      |2019-08-29 02:00:00|  2|
      |2019-08-29 03:00:00|  3|
      |2019-08-29 04:00:00|  4|
      |2019-08-29 05:00:00|  1|
      |2019-08-29 06:00:00|  2|
      |2019-08-29 07:00:00|  3|
      |2019-08-29 08:00:00|  4|
      |2019-08-29 09:00:00|  1|
      |2019-08-29 10:00:00|  2|
      +-------------------+---+
      

      【讨论】:

        猜你喜欢
        • 2021-03-31
        • 1970-01-01
        • 1970-01-01
        • 2023-02-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-08-20
        相关资源
        最近更新 更多