【问题标题】:Renaming columns for PySpark DataFrame aggregates重命名 PySpark DataFrame 聚合的列
【发布时间】:2015-07-11 08:51:12
【问题描述】:

我正在使用 PySpark DataFrames 分析一些数据。假设我有一个要聚合的 DataFrame df

(df.groupBy("group")
   .agg({"money":"sum"})
   .show(100)
)

这会给我:

group                SUM(money#2L)
A                    137461285853
B                    172185566943
C                    271179590646

聚合工作正常,但我不喜欢新的列名SUM(money#2L)。有没有办法将此列重命名为.agg 方法中人类可读的名称?也许更类似于dplyr

df %>% group_by(group) %>% summarise(sum_money = sum(money))

【问题讨论】:

    标签: dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    withColumnRenamed 应该可以解决问题。这是pyspark.sql API的链接。

    df.groupBy("group")\
      .agg({"money":"sum"})\
      .withColumnRenamed("SUM(money)", "money")
      .show(100)
    

    【讨论】:

    • alias 是一个很好的指针,但这是正确的答案 - 有时有充分的理由在 agg 中使用字典,这似乎是“别名”聚合的唯一方法列是重命名它。
    【解决方案2】:

    虽然我还是更喜欢 dplyr 语法,但这段代码 sn-p 就可以了:

    import pyspark.sql.functions as sf
    
    (df.groupBy("group")
       .agg(sf.sum('money').alias('money'))
       .show(100))
    

    它变得冗长。

    【讨论】:

    • 对于已复制粘贴此alias 部分但没有看到它生效的任何其他人,请注意括号。 alias('string') 存在于 内部 agg,否则您将别名整个 DataFrame 而不仅仅是列。
    【解决方案3】:

    我为此做了一个小助手函数,可能会帮助一些人。

    import re
    
    from functools import partial
    
    def rename_cols(agg_df, ignore_first_n=1):
        """changes the default spark aggregate names `avg(colname)` 
        to something a bit more useful. Pass an aggregated dataframe
        and the number of aggregation columns to ignore.
        """
        delimiters = "(", ")"
        split_pattern = '|'.join(map(re.escape, delimiters))
        splitter = partial(re.split, split_pattern)
        split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
        renamed = map(split_agg, agg_df.columns[ignore_first_n:])
        renamed = zip(agg_df.columns[ignore_first_n:], renamed)
        for old, new in renamed:
            agg_df = agg_df.withColumnRenamed(old, new)
        return agg_df
    

    一个例子:

    gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
     .groupby("id")
     .agg({"rank": "mean",
           "*": "count",
           "rate": "mean", 
           "price": "mean", 
           "clicks": "mean", 
           })
    )
    
    >>> gb.columns
    ['id',
     'avg(rate)',
     'count(1)',
     'avg(price)',
     'avg(rank)',
     'avg(clicks)']
    
    >>> rename_cols(gb).columns
    ['id',
     'avg_rate',
     'count_1',
     'avg_price',
     'avg_rank',
     'avg_clicks']
    
    

    至少做一些事情来避免人们打字太多。

    【讨论】:

    • 非常有用且及时。我正要问同样的问题。如果您可以在 agg 字典中指定一个新的列名(我的意思是在 Spark 中),那就太好了。
    • @EvanZamir 谢谢!我可能会为此尝试在 spark 中做一个简单的 PR。
    • 您可以简单地通过df = df.toDF(*newColumnNames) 重命名,其中newColumnNames 包含DataFrame (df) 的所有列名:)
    【解决方案4】:
    df = df.groupby('Device_ID').agg(aggregate_methods)
    for column in df.columns:
        start_index = column.find('(')
        end_index = column.find(')')
        if (start_index and end_index):
            df = df.withColumnRenamed(column, column[start_index+1:end_index])
    

    上面的代码可以去掉“()”之外的任何东西。例如,“sum(foo)”将被重命名为“foo”。

    【讨论】:

    • 请注意没有括号的列,它们将被一起删除,例如 groupby var。可以添加一个 if/continue 检查。我有一个变量是我的 groupby var,所以只需检查一下。
    【解决方案5】:

    很简单:

     val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
    maxVideoLenPerItemDf.show()
    

    在 agg 中使用 .as 来命名创建的新行。

    【讨论】:

    • 从 PySpark 2.4.0 开始,.as('new_name') 应该是 .alias('new_name')
    【解决方案6】:
    import findspark
    findspark.init()
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    spark = SparkSession.builder.appName('test').getOrCreate()
    data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
    schema = ['id', 'name', 'sallary']
    
    df = spark.createDataFrame(data, schema=schema)
    df.show()
    +---+-----+-------+
    | id| name|sallary|
    +---+-----+-------+
    |  1| siva|    100|
    |  2|siva2|    200|
    |  3|siva3|    300|
    |  4|siva4|    400|
    |  5|siva5|    500|
    +---+-----+-------+
    
    
    **df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
    +---+
    |max|
    +---+
    |500|
    +---+
    

    【讨论】:

      【解决方案7】:

      虽然之前给出的答案很好,但我认为他们缺乏一种巧妙的方法来处理 .agg() 中的字典用法

      如果你想使用一个字典,它实际上也可能是动态生成的,因为你有数百列,你可以使用以下内容而无需处理几十行代码:

      # Your dictionary-version of using the .agg()-function
      # Note: The provided logic could actually also be applied to a non-dictionary approach
      df = df.groupBy("group")\
         .agg({
                "money":"sum"
              , "...":  "..."
          })
      
      # Now do the renaming
      newColumnNames = ["group", "money", "..."] # Provide the names for ALL columns of the new df
      df = df.toDF(*newColumnNames)              # Do the renaming
      

      当然newColumnNames-list 也可以动态生成。例如,如果您只将聚合中的列附加到您的 df,您可以预先存储 newColumnNames = df.columns,然后只需附加其他名称。
      无论如何,请注意newColumnNames 必须包含数据帧的所有列名,而不仅仅是要重命名的那些(因为.toDF() 由于 Sparks 不可变 RDD 会创建一个新数据帧)!

      【讨论】:

        【解决方案8】:

        另一个快速的小班轮添加混合:

        df.groupBy('group')
          .agg({'money':'sum',
                'moreMoney':'sum',
                'evenMoreMoney':'sum'
                })
            .select(*(col(i).alias(i.replace("(",'_').replace(')','')) for i in df.columns))
        

        只需将别名函数更改为您想要命名的任何名称。以上生成 sum_money、sum_moreMoney,因为我确实喜欢在变量名中看到运算符。

        【讨论】:

          【解决方案9】:

          .alias.withColumnRenamed 如果您愿意对列名进行硬编码,它们都可以工作。如果您需要程序化解决方案,例如为所有剩余列的聚合提供更友好的名称,这提供了一个很好的起点:

          grouping_column = 'group'
          cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
          (
              df
              .groupBy(grouping_column)
              .agg(
                  *cols
              )
          )
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2016-09-23
            • 2016-12-23
            • 1970-01-01
            • 1970-01-01
            • 2021-12-24
            • 2017-05-30
            • 2022-01-17
            相关资源
            最近更新 更多