【问题标题】:PySpark: compute row maximum of the subset of columns and add to an exisiting dataframePySpark:计算列子集的最大行数并添加到现有数据帧
【发布时间】:2016-11-29 19:54:34
【问题描述】:

我想计算每一行的列子集的最大值并将其添加为现有Dataframe 的新列。

我设法以非常尴尬的方式做到了这一点:

def add_colmax(df,subset_columns,colnm):
     '''
     calculate the maximum of the selected "subset_columns" from dataframe df for each row, 
     new column containing row wise maximum is added to dataframe df. 

     df: dataframe. It must contain subset_columns as subset of columns
     colnm: Name of the new column containing row-wise maximum of subset_columns
     subset_columns: the subset of columns from w
     '''
     from pyspark.sql.functions import monotonicallyIncreasingId
     from pyspark.sql import Row
     def get_max_row_with_None(row):
         return float(np.max(row))

     df_subset = df.select(subset_columns)
     rdd = df_subset.map( get_max_row_with_None)
     df_rowsum = rdd.map(Row(colnm)).toDF()
     df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId())
     df = df.withColumn("id",monotonicallyIncreasingId())
     df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id)
     return df

这个函数的作用是:

rdd1 =  sc.parallelize([("foo", 1.0,3.0,None), 
                    ("bar", 2.0,2.0,-10), 
                    ("baz", 3.3,1.2,10.0)])


df1 = sqlContext.createDataFrame(rdd1, ('v1', 'v2','v3','v4'))
df_new = add_colmax(df1,['v2','v3','v4'],"rowsum")   
df_new.collect()

返回:

 [Row(v1=u'bar', v2=2.0, v3=2.0, v4=-10, rowsum=2.0),
  Row(v1=u'baz', v2=3.3, v3=1.2, v4=None, rowsum=3.3),
  Row(v1=u'foo', v2=1.0, v3=3.0, v4=None, rowsum=3.0)]

我认为如果我可以使用withColumn 的用户定义函数,这可以更简单地完成。但我不知道该怎么做。 如果您有更简单的方法来实现这一点,请告诉我。 我正在使用 Spark 1.6

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    让我们从几个导入开始

    from pyspark.sql.functions import col, lit, coalesce, greatest
    

    接下来定义负无穷大字面量:

    minf = lit(float("-inf"))
    

    映射列并将结果传递给greatest

    rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']])
    

    终于withColumn:

    df1.withColumn("rowmax", rowmax)
    

    结果:

    +---+---+---+----+------+
    | v1| v2| v3|  v4|rowmax|
    +---+---+---+----+------+
    |foo|1.0|3.0|null|   3.0|
    |bar|2.0|2.0| -10|   2.0|
    |baz|3.3|1.2|null|   3.3|
    +---+---+---+----+------+
    

    您可以将相同的模式与不同的行操作替换 minf 与中性元素。例如:

    rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']])
    

    或:

    from operator import mul
    from functools import reduce
    
    rowproduct = reduce(
      mul, 
      [coalesce(col(x), lit(1)) for x in ['v2','v3','v4']]
    )
    

    udf 可以显着简化您自己的代码:

    from pyspark.sql.types import DoubleType
    from pyspark.sql.functions import udf
    
    def get_max_row_with_None_(*cols):
        return float(max(x for x in cols if x is not None))
    
    get_max_row_with_None = udf(get_max_row_with_None_, DoubleType())
    df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4'))
    

    minf 替换为lit(float("inf")),将greatest 替换为least,以获得每行的最小值。

    【讨论】:

    • 拜托,再举一个比这更简单的例子……
    猜你喜欢
    • 2018-07-25
    • 2020-10-26
    • 1970-01-01
    • 1970-01-01
    • 2018-09-17
    • 2019-12-10
    • 2022-01-24
    • 1970-01-01
    • 2016-01-25
    相关资源
    最近更新 更多