【问题标题】:Evaluate nans slices size评估 nans 切片大小
【发布时间】:2019-10-12 22:40:51
【问题描述】:

我有一个 spark 数据框,其中有一列中有一些 null 值。我需要计算非空值之前的连续 null 值。

使用 numpy 我会做这样的事情(代码没有针对 numpy 进行优化,因为我试图在我的问题中不使用它):

import numpy as np

x = np.array([[0, None], [1, 3.], [2, 7.], [3, None], [4, 4.], [5, 3.], 
              [6, None], [7, None], [8, 5.], [9, 2.], [10, None]])

def nan_count(l, n):
    assert n <= len(l) + 1
    assert n >= 0

    if n < 1 or l[n-1] is not None:
        return 0
    return nan_count(l, n-1) + 1

y = map(lambda i: nan_count(x[:,1], i), x[:,0])
res = np.concatenate([x, np.asarray(y).reshape(-1,1)], axis = 1)
res

所以输出看起来像这样:

Out[31]: [0, 1, 0, 0, 1, 0, 0, 1, 2, 0, 0]

现在,如果我有像 x 这样的 spark DataFrame:

x = sc.parallelize([[0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
                    [5, 3.], [6, None], [7, None], [8, 5.], [9, 2.], [10, None]])\
      .toDF()
x.show()
+---+----+
| _1|  _2|
+---+----+
|  0|null|
|  1| 3.0|
|  2| 7.0|
|  3|null|
|  4| 4.0|
|  5| 3.0|
|  6|null|
|  7|null|
|  8| 5.0|
|  9| 2.0|
| 10|null|
+---+----+

如何获得相同的输出?

我已经尝试了一些使用udf 的解决方法,但是我在引用选择之前的值时遇到了问题(我尝试在udf 中使用selectfilter pyspark.sql.dataframe.DataFrame 方法,但它不是允许)。

编辑:我不知道我能找到多少个连续的nans

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql user-defined-functions


    【解决方案1】:

    我将 cmets 放入代码中,以解释每个步骤,直到达到所需的输出。

    当然,没有必要从下面的示例中创建所有列,并且可能此代码可以得到很大改进,但我认为重要的是逐步向您展示并开始解决您的问题。

    x = sc.parallelize([
        [0, None],
        [1, 3.],
        [2, 7.],
        [3, None],
        [4, 4.],
        [5, 3.],
        [6, None],
        [7, None],
        [8, 5.],
        [9, 2.],
        [10, None]
    ])
    # Assigned values ​​in columns A and B to facilitate manipulation
    x = x.toDF(['A', 'B'])
    
    # Prints initial DF
    x.show()
    

    输出:

    +---+----+
    |  A|   B|
    +---+----+
    |  0|null|
    |  1| 3.0|
    |  2| 7.0|
    |  3|null|
    |  4| 4.0|
    |  5| 3.0|
    |  6|null|
    |  7|null|
    |  8| 5.0|
    |  9| 2.0|
    | 10|null|
    +---+----+
    
    # Transform null values into "1"
    x = x.withColumn('C', when(x.B.isNull(), 1))
    x.show()
    

    输出:

    +---+----+----+
    |  A|   B|   C|
    +---+----+----+
    |  0|null|   1|
    |  1| 3.0|null|
    |  2| 7.0|null|
    |  3|null|   1|
    |  4| 4.0|null|
    |  5| 3.0|null|
    |  6|null|   1|
    |  7|null|   1|
    |  8| 5.0|null|
    |  9| 2.0|null|
    | 10|null|   1|
    +---+----+----+
    
    # Creates a spec that order column A
    order_spec = Window().orderBy('A')
    
    # Doing a cumulative sum. See the explanation
    # https://stackoverflow.com/questions/56384625/pyspark-cumulative-sum-with-reset-condition
    x = x \
        .withColumn('tmp', sum((x.C.isNull()).cast('int')).over(order_spec)) \
        .withColumn('D', sum(x.C).over(order_spec.partitionBy("tmp"))) \
        .drop('tmp')
    x.show()
    

    输出:

    +---+----+----+----+
    |  A|   B|   C|   D|
    +---+----+----+----+
    |  0|null|   1|   1|
    |  1| 3.0|null|null|
    |  2| 7.0|null|null|
    |  3|null|   1|   1|
    |  4| 4.0|null|null|
    |  5| 3.0|null|null|
    |  6|null|   1|   1|
    |  7|null|   1|   2|
    |  8|null|   1|   3|
    |  9| 5.0|null|null|
    | 10| 2.0|null|null|
    | 11|null|   1|   1|
    +---+----+----+----+
    
    # Put values from column D to one row above and select the desired output values
    x = x.withColumn('E', lag(x.D, ).over(order_spec)) \
        .select(x.A, x.B, when(col('E').isNotNull(), col('E')).otherwise(0).alias('nan_count'))
    x.show()
    

    输出:

    +---+----+---------+
    |  A|   B|nan_count|
    +---+----+---------+
    |  0|null|        0|
    |  1| 3.0|        1|
    |  2| 7.0|        0|
    |  3|null|        0|
    |  4| 4.0|        1|
    |  5| 3.0|        0|
    |  6|null|        0|
    |  7|null|        1|
    |  8|null|        2|
    |  9| 5.0|        3|
    | 10| 2.0|        0|
    | 11|null|        0|
    +---+----+---------+
    

    完整代码:

    from pyspark.shell import sc
    from pyspark.sql import Window
    from pyspark.sql.functions import lag, when, sum, col
    
    x = sc.parallelize([
        [0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
        [5, 3.], [6, None], [7, None], [8, None], [9, 5.], [10, 2.], [11, None]])
    x = x.toDF(['A', 'B'])
    
    # Transform null values into "1"
    x = x.withColumn('C', when(x.B.isNull(), 1))
    
    # Creates a spec that order column A
    order_spec = Window().orderBy('A')
    
    # Doing a cumulative sum with reset condition. See the explanation
    # https://stackoverflow.com/questions/56384625/pyspark-cumulative-sum-with-reset-condition
    x = x \
        .withColumn('tmp', sum((x.C.isNull()).cast('int')).over(order_spec)) \
        .withColumn('D', sum(x.C).over(order_spec.partitionBy("tmp"))) \
        .drop('tmp')
    
    # Put values from column D to one row above and select the desired output values
    x = x.withColumn('E', lag(x.D, ).over(order_spec)) \
        .select(x.A, x.B, when(col('E').isNotNull(), col('E')).otherwise(0).alias('nan_count'))
    x.show()
    

    【讨论】:

    • 它部分有效:我忘了说我可能有两个以上连续的nans:你发送的代码是正确的,直到连续两个nans,如果还有更多它只是写@987654332 @。我认为问题出在代码的sum_between 行,对吗?
    • 是的,但那是一个虚拟数据集,我有一个来自传感器的很长的时间序列,它碰巧错过了一些读数,并缓冲了这些值。我的序列中可能有长于两个的空值,即[…,1.5, nan, nan, nan, nan, nan, 6.7, ...]
    猜你喜欢
    • 2017-04-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-04
    • 2020-05-04
    • 2015-04-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多