【问题标题】:Adding a group count column to a PySpark dataframe将组计数列添加到 PySpark 数据帧
【发布时间】:2018-07-25 09:51:23
【问题描述】:

我从 R 和 tidyverse 来到 PySpark,因为它具有出色的 Spark 处理能力,我正在努力将某些概念从一个上下文映射到另一个上下文。

特别是,假设我有一个如下所示的数据集

x | y
--+--
a | 5
a | 8
a | 7
b | 1

我想添加一列,其中包含每个 x 值的行数,如下所示:

x | y | n
--+---+---
a | 5 | 3
a | 8 | 3
a | 7 | 3
b | 1 | 1

在 dplyr 中,我只想说:

import(tidyverse)

df <- read_csv("...")
df %>%
    group_by(x) %>%
    mutate(n = n()) %>%
    ungroup()

就是这样。如果我想按行数总结,我可以在 PySpark 中做几乎一样简单的事情:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

spark.read.csv("...") \
    .groupBy(col("x")) \
    .count() \
    .show()

我以为我理解withColumn 等同于dplyr 的mutate。但是,当我执行以下操作时,PySpark 告诉我 withColumn 没有为 groupBy 数据定义:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

spark = SparkSession.builder.getOrCreate()

spark.read.csv("...") \
    .groupBy(col("x")) \
    .withColumn("n", count("x")) \
    .show()

在短期内,我可以简单地创建第二个包含计数的数据帧并将其连接到原始数据帧。但是,在大型表的情况下,这似乎会变得低效。实现此目的的规范方法是什么?

【问题讨论】:

  • @pault 的出色回答。正如他回答的第一句话所说:“您必须先指定聚合,然后才能显示结果”。我认为 OP 试图避免 count(),将其视为一种操作。 count() 的一个关键理论点是: * 如果 count() 直接在 DF 上调用,则它是一个 Action * 但如果 count() 在 groupby() 之后调用,则 count() 应用于groupedDataSet 而不是 DF,count() 变成了转换而不是动作。

标签: apache-spark pyspark dplyr


【解决方案1】:

我发现我们可以更接近 tidyverse 示例:

from pyspark.sql import Window
w = Window.partitionBy('x')
df.withColumn('n', f.count('x').over(w)).sort('x', 'y').show()

【讨论】:

    【解决方案2】:

    作为@pault 附录

    import pyspark.sql.functions as F
    
    ...
    
    (df
    .groupBy(F.col('x'))
    .agg(F.count('x').alias('n'))
    .show())
    
    #+---+---+
    #|  x|  n|
    #+---+---+
    #|  b|  1|
    #|  a|  3|
    #+---+---+
    

    享受

    【讨论】:

      【解决方案3】:

      当您执行groupBy() 时,您必须先指定聚合,然后才能显示结果。例如:

      import pyspark.sql.functions as f
      data = [
          ('a', 5),
          ('a', 8),
          ('a', 7),
          ('b', 1),
      ]
      df = sqlCtx.createDataFrame(data, ["x", "y"])
      df.groupBy('x').count().select('x', f.col('count').alias('n')).show()
      #+---+---+
      #|  x|  n|
      #+---+---+
      #|  b|  1|
      #|  a|  3|
      #+---+---+
      

      这里我使用alias() 重命名列。但这只会返回每组一行。如果您希望所有行都附加计数,您可以使用Window

      from pyspark.sql import Window
      w = Window.partitionBy('x')
      df.select('x', 'y', f.count('x').over(w).alias('n')).sort('x', 'y').show()
      #+---+---+---+
      #|  x|  y|  n|
      #+---+---+---+
      #|  a|  5|  3|
      #|  a|  7|  3|
      #|  a|  8|  3|
      #|  b|  1|  1|
      #+---+---+---+
      

      或者,如果您更熟悉 SQL,您可以将数据框注册为临时表,并利用 pyspark-sql 做同样的事情:

      df.registerTempTable('table')
      sqlCtx.sql(
          'SELECT x, y, COUNT(x) OVER (PARTITION BY x) AS n FROM table ORDER BY x, y'
      ).show()
      #+---+---+---+
      #|  x|  y|  n|
      #+---+---+---+
      #|  a|  5|  3|
      #|  a|  7|  3|
      #|  a|  8|  3|
      #|  b|  1|  1|
      #+---+---+---+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-04-19
        • 2021-03-19
        • 2022-08-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-05-19
        • 1970-01-01
        相关资源
        最近更新 更多