【问题标题】:calculate the sum and countDistinct after groupby in PySpark在 PySpark 中 groupby 之后计算 sum 和 countDistinct
【发布时间】:2021-11-11 07:36:06
【问题描述】:

我有一个 PySpark 数据框,想按几列分组,然后计算某些列的总和并计算另一列的不同值。由于countDistinct 不是内置聚合函数,所以我不能使用我在这里尝试过的简单表达式:

sum_cols = ['a', 'b']
count_cols = ['id']
exprs1 = {x: "sum" for x in sum_cols}
exprs2 = {x: "countDistinct" for x in count_cols}
exprs = {**exprs1, **exprs2}

df_aggregated = df.groupby('month','product').agg(exprs)

我也尝试了this answer 中的方法exprs2 = [countDistinct(x) for x in count_cols],但是当我只为聚合列尝试AssertionError: all exprs should be Column 时收到错误消息。

如何在一个聚合中结合 sum 和 count distinct?我知道,我可以用sum 列和countDistinct 列做一次,然后加入两个数据框,但应该有一个解决方案可以一步完成...

【问题讨论】:

    标签: python pyspark group-by aggregate-functions distinct


    【解决方案1】:

    不要使用agg 的字典版本,而是使用带有列列表的版本:

    from pyspark.sql import functions as F
    df = ...
    exprs1 = [F.sum(c) for c in sum_cols]
    exprs2 = [F.countDistinct(c) for c in count_cols]
    
    df_aggregated = df.groupby('month_product').agg(*(exprs1+exprs2))
    

    如果你想保持当前的逻辑,你可以切换到approx_count_distinct。与countDistinct 不同,此函数可作为 SQL 函数使用。

    【讨论】:

      【解决方案2】:

      不确定为什么必须使用expr,但正常的聚合应该可以工作。 countDistinct 是一个聚合函数。

      (df
          .groupBy('month','product')
          .agg(
              F.sum('a', 'b'),
              F.countDistinct('id')
          )
      ).show()
      
      # +----+-----------+-------------+
      # |name|sum(field1)|count(field1)|
      # +----+-----------+-------------+
      # |   d|          0|            1|
      # |   c|         10|            1|
      # |   b|          5|            1|
      # |   a|          4|            1|
      # +----+-----------+-------------+
      

      【讨论】:

      • 我没有提到,我有大约 20 列以相同的字符串开头,我需要求和,另外 x 列需要计算(不同的值)
      • 同样的,你可以通过F.sum(sum_cols)F. countDistinct(count_cols)
      猜你喜欢
      • 2021-09-29
      • 1970-01-01
      • 2018-03-07
      • 1970-01-01
      • 2016-02-04
      • 2016-10-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多