【问题标题】:Comparing columns in Pyspark比较 Pyspark 中的列
【发布时间】:2016-10-07 00:06:59
【问题描述】:

我正在研究具有 n 列的 PySpark DataFrame。我有一组 m 列 (m

例如:

输入:PySpark DataFrame 包含:

col_1 = [1,2,3], col_2 = [2,1,4], col_3 = [3,2,5]

输出:

col_4 = max(col1, col_2, col_3) = [3,2,5]

this 问题中解释的熊猫中有类似的东西。

有没有办法在 PySpark 中执行此操作,或者我应该将我的 PySpark df 更改为 Pandas df 然后执行操作?

【问题讨论】:

  • 如果问题是关于获取每列的最大值,那么看起来预期的输出应该是 [max(col_1), max(col_2), max(col_3)] = [3, 4, 5]

标签: python apache-spark pyspark


【解决方案1】:

您可以在列列表上使用 SQL 表达式来减少:

from pyspark.sql.functions import max as max_, col, when
from functools import reduce

def row_max(*cols):
    return reduce(
        lambda x, y: when(x > y, x).otherwise(y),
        [col(c) if isinstance(c, str) else c for c in cols]
    )

df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)])
    .toDF(["a", "b", "c"]))

df.select(row_max("a", "b", "c").alias("max")))

Spark 1.5+ 还提供leastgreatest

from pyspark.sql.functions import greatest

df.select(greatest("a", "b", "c"))

如果您想保留最大值的名称,您可以使用 `structs:

from pyspark.sql.functions import struct, lit

def row_max_with_name(*cols):
    cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols]
    return greatest(*cols_).alias("greatest({0})".format(",".join(cols)))

 maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))

最后你可以使用上面找到选择“顶部”列:

from pyspark.sql.functions import max

((_, c), ) = (maxs
    .groupBy(col("maxs")["col"].alias("col"))
    .count()
    .agg(max(struct(col("count"), col("col"))))
    .first())

df.select(c)

【讨论】:

  • 这很有帮助!你怎么找到第二大的呢?我想获取第二大列的名称
【解决方案2】:

我们可以使用greatest

创建数据帧

df = spark.createDataFrame(
    [[1,2,3], [2,1,2], [3,4,5]], 
    ['col_1','col_2','col_3']
)
df.show()
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    1|    2|    3|
|    2|    1|    2|
|    3|    4|    5|
+-----+-----+-----+

解决方案

from pyspark.sql.functions import greatest
df2 = df.withColumn('max_by_rows', greatest('col_1', 'col_2', 'col_3'))

#Only if you need col
#from pyspark.sql.functions import col
#df2 = df.withColumn('max', greatest(col('col_1'), col('col_2'), col('col_3')))
df2.show()

+-----+-----+-----+-----------+
|col_1|col_2|col_3|max_by_rows|
+-----+-----+-----+-----------+
|    1|    2|    3|          3|
|    2|    1|    2|          2|
|    3|    4|    5|          5|
+-----+-----+-----+-----------+

【讨论】:

    【解决方案3】:

    你也可以使用pyspark内置的least

    from pyspark.sql.functions import least, col
    df = df.withColumn('min', least(col('c1'), col('c2'), col('c3')))
    

    【讨论】:

    • 我认为 OP 想要与此相反。是否有等效的most 函数?
    • 啊,这是greatest - 请参阅下面的@ansev 答案
    【解决方案4】:

    另一种简单的方法。假设下面的df 是您的数据框

    df = sc.parallelize([(10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4)]).toDF(["c1", "c2", "c3"])
    df.show()
    
    +---+---+---+
    | c1| c2| c3|
    +---+---+---+
    | 10| 10|  1|
    |200|  2| 20|
    |  3| 30|300|
    |400| 40|  4|
    +---+---+---+
    

    您可以如下处理上述df以获得所需的结果

    from pyspark.sql.functions import lit, min
    
    df.select( lit('c1').alias('cn1'), min(df.c1).alias('c1'),
               lit('c2').alias('cn2'), min(df.c2).alias('c2'),
               lit('c3').alias('cn3'), min(df.c3).alias('c3')
              )\
             .rdd.flatMap(lambda r: [ (r.cn1, r.c1), (r.cn2, r.c2), (r.cn3, r.c3)])\
             .toDF(['Columnn', 'Min']).show()
    
    +-------+---+
    |Columnn|Min|
    +-------+---+
    |     c1|  3|
    |     c2|  2|
    |     c3|  1|
    +-------+---+
    

    【讨论】:

    • 你正在做 min(col1),而我想要 min(row1)、min(row2).. 等等......
    【解决方案5】:

    Scala 解决方案:

    df = sc.parallelize(Seq((10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4))).toDF("c1", "c2", "c3"))  
    
    df.rdd.map(row=>List[String](row(0).toString,row(1).toString,row(2).toString)).map(x=>(x(0),x(1),x(2),x.min)).toDF("c1","c2","c3","min").show    
    

    +---+---+---+---+  
    | c1| c2| c3|min|  
    +---+---+---+---+  
    | 10| 10|  1|  1|    
    |200|  2| 20|  2|  
    |  3| 30|300|  3|  
    |400| 40|  4|  4|  
    +---+---+---+---+  
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-06-17
      • 2020-01-30
      • 2017-07-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多