【问题标题】:Pyspark : forward fill with last observation for a DataFramePyspark:前向填充数据帧的最后一次观察
【发布时间】:2016-07-01 09:33:48
【问题描述】:

使用 Spark 1.5.1,

我一直在尝试用我的 DataFrame 一列的最后一个已知观察值转发填充 null 值

可以从一个空值开始,在这种情况下,我会用第一个已知的观察值反向填充这个空值。但是,如果代码太复杂,这点可以跳过。

在这个post 中,zero323 为一个非常相似的问题提供了 Scala 的解决方案。

但是,我不了解 Scala,也没有成功在 Pyspark API 代码中“翻译”它。可以用 Pyspark 做到这一点吗?

感谢您的帮助。

下面是一个简单的示例输入:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | null 
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | null   
| 1             | 2015-12-05 | null     
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | null
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | null      
| 2             | 2015-12-03 | null     
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | null   
| 2             | 2015-12-06 | U4

以及预期的输出:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | U1
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | U1
| 1             | 2015-12-05 | U1
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | U2
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | U1
| 2             | 2015-12-03 | U3
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | U3
| 2             | 2015-12-06 | U4

【问题讨论】:

  • 如果我明白了逻辑,我不是。用户和cookie之间的关系是多对多的?另外你如何定义顺序?行顺序在 Spark SQL 中并不是特别没有意义(不是在任何 SQLish 环境中)
  • 抱歉,我忘记在示例中包含时间戳(我对其进行了编辑)。我在示例中引入了 Cookie_ID 变量,以表明我必须通过 cookie 转发填充空值。感谢您的帮助。
  • 你找到解决方案了吗?

标签: apache-spark pyspark apache-spark-sql spark-dataframe


【解决方案1】:

Cloudera 发布了一个名为 spark-ts 的库,它提供了一套有用的方法来处理 Spark 中的时间序列和顺序数据。该库支持多种时间窗方法,用于根据序列中的其他数据估算数据点。

http://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series-data-with-apache-spark/

【讨论】:

    【解决方案2】:

    显示了来自 pyspark 中 Spark / Scala: forward fill with last observation 的分区示例代码。这仅适用于可以分区的数据。

    加载数据

    values = [
        (1, "2015-12-01", None),
        (1, "2015-12-02", "U1"),
        (1, "2015-12-02", "U1"),
        (1, "2015-12-03", "U2"),
        (1, "2015-12-04", None),
        (1, "2015-12-05", None),
        (2, "2015-12-04", None),
        (2, "2015-12-03", None),
        (2, "2015-12-02", "U3"),
        (2, "2015-12-05", None),
    ]
    rdd = sc.parallelize(values)
    df = rdd.toDF(["cookie_id", "c_date", "user_id"])
    df = df.withColumn("c_date", df.c_date.cast("date"))
    df.show()
    

    DataFrame 是

    +---------+----------+-------+
    |cookie_id|    c_date|user_id|
    +---------+----------+-------+
    |        1|2015-12-01|   null|
    |        1|2015-12-02|     U1|
    |        1|2015-12-02|     U1|
    |        1|2015-12-03|     U2|
    |        1|2015-12-04|   null|
    |        1|2015-12-05|   null|
    |        2|2015-12-04|   null|
    |        2|2015-12-03|   null|
    |        2|2015-12-02|     U3|
    |        2|2015-12-05|   null|
    +---------+----------+-------+
    

    用于对分区进行排序的列

    # get the sort key
    def getKey(item):
        return item.c_date
    

    填充函数。如有必要,可用于填写多列。

    # fill function
    def fill(x):
        out = []
        last_val = None
        for v in x:
            if v["user_id"] is None:
                data = [v["cookie_id"], v["c_date"], last_val]
            else:
                data = [v["cookie_id"], v["c_date"], v["user_id"]]
                last_val = v["user_id"]
            out.append(data)
        return out
    

    转为rdd,分区,排序,填补缺失值

    # Partition the data
    rdd = df.rdd.groupBy(lambda x: x.cookie_id).mapValues(list)
    # Sort the data by date
    rdd = rdd.mapValues(lambda x: sorted(x, key=getKey))
    # fill missing value and flatten
    rdd = rdd.mapValues(fill).flatMapValues(lambda x: x)
    # discard the key
    rdd = rdd.map(lambda v: v[1])
    

    转回DataFrame

    df_out = sqlContext.createDataFrame(rdd)
    df_out.show()
    

    输出是

    +---+----------+----+
    | _1|        _2|  _3|
    +---+----------+----+
    |  1|2015-12-01|null|
    |  1|2015-12-02|  U1|
    |  1|2015-12-02|  U1|
    |  1|2015-12-03|  U2|
    |  1|2015-12-04|  U2|
    |  1|2015-12-05|  U2|
    |  2|2015-12-02|  U3|
    |  2|2015-12-03|  U3|
    |  2|2015-12-04|  U3|
    |  2|2015-12-05|  U3|
    +---+----------+----+
    

    【讨论】:

    【解决方案3】:

    希望您发现这个前向填充功能很有用。它是使用本机 pyspark 函数编写的。 udf 和 rdd 都没有被使用(它们都非常慢,尤其是 UDF!)。

    让我们使用@Sid 提供的示例。

    values = [
        (1, "2015-12-01", None),
        (1, "2015-12-02", "U1"),
        (1, "2015-12-02", "U1"),
        (1, "2015-12-03", "U2"),
        (1, "2015-12-04", None),
        (1, "2015-12-05", None),
        (2, "2015-12-04", None),
        (2, "2015-12-03", None),
        (2, "2015-12-02", "U3"),
        (2, "2015-12-05", None),
    ] 
    
    df = spark.createDataFrame(values, ['cookie_ID', 'Time', 'User_ID'])
    

    功能:

    def cum_sum(df, sum_col , order_col, cum_sum_col_nm='cum_sum'):  
        '''Find cumulative sum of a column. 
        Parameters 
        -----------
        sum_col : String 
            Column to perform cumulative sum. 
        order_col : List 
            Column/columns to sort for cumulative sum. 
        cum_sum_col_nm : String
            The name of the resulting cum_sum column. 
    
        Return
        -------
        df : DataFrame
            Dataframe with additional "cum_sum_col_nm". 
    
        '''
        df = df.withColumn('tmp', lit('tmp')) 
    
        windowval = (Window.partitionBy('tmp') 
                     .orderBy(order_col)
                     .rangeBetween(Window.unboundedPreceding, 0)) 
    
        df = df.withColumn('cum_sum', sum(sum_col).over(windowval).alias('cumsum').cast(StringType()))
        df = df.drop('tmp') 
        return df   
    
    
    def forward_fill(df, order_col, fill_col, fill_col_name=None):
        '''Forward fill a column by a column/set of columns (order_col).  
        Parameters:
        ------------
        df: Dataframe 
        order_col: String or List of string
        fill_col: String (Only work for a column for this version.) 
    
        Return:
        ---------
        df: Dataframe 
            Return df with the filled_cols. 
        '''
    
        # "value" and "constant" are tmp columns created ton enable forward fill. 
        df = df.withColumn('value', when(col(fill_col).isNull(), 0).otherwise(1))
        df = cum_sum(df, 'value', order_col).drop('value')  
        df = df.withColumn(fill_col, 
                    when(col(fill_col).isNull(), 'constant').otherwise(col(fill_col))) 
    
        win = (Window.partitionBy('cum_sum') 
                  .orderBy(order_col)) 
    
        if not fill_col_name:
            fill_col_name = 'ffill_{}'.format(fill_col)
    
        df = df.withColumn(fill_col_name, collect_list(fill_col).over(win)[0])
        df = df.drop('cum_sum')
        df = df.withColumn(fill_col_name, when(col(fill_col_name)=='constant', None).otherwise(col(fill_col_name)))
        df = df.withColumn(fill_col, when(col(fill_col)=='constant', None).otherwise(col(fill_col)))
        return df   
    

    让我们看看结果。

    ffilled_df = forward_fill(df, 
                              order_col=['cookie_ID', 'Time'], 
                              fill_col='User_ID', 
                              fill_col_name = 'User_ID_ffil')
    ffilled_df.sort(['cookie_ID', 'Time']).show()   
    

    【讨论】:

    【解决方案4】:

    另一种解决方法是尝试这样的方法:

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    window = (
        Window
        .partitionBy('cookie_id')
        .orderBy('Time')
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    )
    
    final = (
        joined
        .withColumn('UserIDFilled', F.last('User_ID', ignorenulls=True).over(window))
    )
    

    所以它所做的是它根据分区键和顺序列构建您的窗口。它还告诉窗口回顾窗口内的所有行直到当前行。最后,在每一行,您返回最后一个不为 null 的值(记住,根据您的窗口,它包括您的当前行)

    【讨论】:

      【解决方案5】:
      // Forward filling
      w1 = Window.partitionBy('cookie_id').orderBy('c_date').rowsBetween(Window.unboundedPreceding,0)
      w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
      
      //Backward filling
      final_df = df.withColumn('UserIDFilled', F.coalesce(F.last('user_id', True).over(w1),
                                                          F.first('user_id',True).over(w2)))
      
      final_df.orderBy('cookie_id', 'c_date').show(truncate=False)
      
         +---------+----------+-------+------------+
      |cookie_id|c_date    |user_id|UserIDFilled|
      +---------+----------+-------+------------+
      |1        |2015-12-01|null   |U1          |
      |1        |2015-12-02|U1     |U1          |
      |1        |2015-12-02|U1     |U1          |
      |1        |2015-12-03|U2     |U2          |
      |1        |2015-12-04|null   |U2          |
      |1        |2015-12-05|null   |U2          |
      |2        |2015-12-02|U3     |U3          |
      |2        |2015-12-03|null   |U3          |
      |2        |2015-12-04|null   |U3          |
      |2        |2015-12-05|null   |U3          |
      +---------+----------+-------+------------+
      

      【讨论】:

        猜你喜欢
        • 2016-02-10
        • 1970-01-01
        • 2021-11-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-06-27
        • 1970-01-01
        • 2021-03-21
        相关资源
        最近更新 更多