【问题标题】:Pyspark aggregationPyspark 聚合
【发布时间】:2019-08-26 09:12:07
【问题描述】:

我正在尝试聚合 pyspark 数据框。示例如下所示:

+---+-------------------+
| id|             struct|
+---+-------------------+
|id1|  [foo, true, true]|
|id1| [foo, true, false]|
|id1|[foo, false, false]|
|id1|  [bar, true, true]|
|id1| [bar, true, false]|
|id1|[bar, false, false]|
|id2|  [foo, true, true]|
|id2|[foo, false, false]|
|id2|  [bar, true, true]|
|id2|[bar, false, false]|
+---+-------------------+

ID 列最多有 1500 个唯一 ID,struct.name 将有 5 个唯一值。

这是我的代码,它计算我想要的:

from pyspark.sql.types import *
from shared.spark import start_spark
import pyspark.sql.functions as F
spark = start_spark('app')
schema = StructType([StructField('id', StringType()),
                     StructField('struct', StructType(
                         [StructField('name', StringType()),
                          StructField('param1', BooleanType()),
                          StructField('param2', BooleanType()),
                          ]
                     ))])
data = [['id1', ['foo', True, True]],
        ['id1', ['foo', True, False]],
        ['id1', ['foo', False, False]],
        ['id1', ['bar', True, True]],
        ['id1', ['bar', True, False]],
        ['id1', ['bar', False, False]],
        ['id2', ['foo', True, True]],
        ['id2', ['foo', False, False]],
        ['id2', ['bar', True, True]],
        ['id2', ['bar', False, False]]
        ]
df = spark.createDataFrame(data, schema)

df.groupby('id')\
    .agg(F.count(F.when((df['struct.name']=='foo') &
                        (df['struct.param1']) &
                        (df['struct.param2']), 1)).alias('foo_cond1'),
         F.count(F.when((df['struct.name'] == 'foo') &
                        (df['struct.param1']) &
                        (df['struct.param2']==False), 1)).alias('foo_cond2'),
         F.count(F.when((df['struct.name'] == 'foo') &
                        (df['struct.param1']==False) &
                        (df['struct.param2']==False), 1)).alias('foo_cond3'),
         F.count(F.when((df['struct.name']=='bar') &
                        (df['struct.param1']) &
                        (df['struct.param2']), 1)).alias('bar_cond1'),
         F.count(F.when((df['struct.name'] == 'bar') &
                        (df['struct.param1']) &
                        (df['struct.param2']==False), 1)).alias('bar_cond2'),
         F.count(F.when((df['struct.name'] == 'bar') &
                        (df['struct.param1']==False) &
                        (df['struct.param2']==False), 1)).alias('bar_cond3'),
         ) \
    .withColumn('foo', F.struct(F.col('foo_cond1').alias('cond1'),
                                F.col('foo_cond2').alias('cond2'),
                                F.col('foo_cond3').alias('cond3')
                                )
                ) \
    .withColumn('bar', F.struct(F.col('bar_cond1').alias('cond1'),
                                F.col('bar_cond2').alias('cond2'),
                                F.col('bar_cond3').alias('cond3')
                                )
                ) \
    .select('id', 'foo', 'bar') \
    .show()

结果如下:

+---+---------+---------+
| id|      foo|      bar|
+---+---------+---------+
|id1|[1, 1, 1]|[1, 1, 1]|
|id2|[1, 0, 1]|[1, 0, 1]|
+---+---------+---------+

有没有更好的方法来进行这种聚合,用更少的代码执行得更好?也许使用熊猫UDAF?欣赏每一条评论。谢谢

【问题讨论】:

    标签: pyspark


    【解决方案1】:

    我可以使用 pandasUDFType,但运行时间似乎增加了 30% 以上。但我只使用提到的样本数据。

    from pyspark.sql.types import *
    from shared.spark import start_spark
    import pyspark.sql.functions as F
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    spark = start_spark('app')
    schema = StructType([StructField('id', StringType()),
                         StructField('struct', StructType(
                             [StructField('name', StringType()),
                              StructField('param1', BooleanType()),
                              StructField('param2', BooleanType()),
                              ]
                         ))])
    schema_udf = StructType(
                     [StructField('id', StringType()),
                      StructField('foo1', DoubleType()),
                      StructField('foo2', DoubleType()),
                      StructField('foo3', DoubleType()),
                      StructField('bar1', DoubleType()),
                      StructField('bar2', DoubleType()),
                      StructField('bar3', DoubleType()),
                      ])
    data = [['id1', ['foo', True, True]],
            ['id1', ['foo', True, False]],
            ['id1', ['foo', False, False]],
            ['id1', ['bar', True, True]],
            ['id1', ['bar', True, False]],
            ['id1', ['bar', False, False]],
            ['id2', ['foo', True, True]],
            ['id2', ['foo', False, False]],
            ['id2', ['bar', True, True]],
            ['id2', ['bar', False, False]]
            ]
    df = spark.createDataFrame(data, schema)
    
    @pandas_udf(schema_udf, PandasUDFType.GROUPED_MAP)
    def myGroupby(df_group):
        def countComb(df):
            def countCombinations(param1, param2):
                cond1, cond2, cond3 = 0, 0, 0
                if param1:
                    if param2:
                        cond1 += 1
                    else:
                        cond2 += 1
                else:
                    cond3 += 1
                return cond1, cond2, cond3
            if df['name']=='foo':
                df['foo1'], df['foo2'], df['foo3'] = countCombinations(df.param1, df.param2)
            if df['name']=='bar':
                df['bar1'], df['bar2'], df['bar3'] = countCombinations(df.param1, df.param2)
            return df
        df_result = df_group.apply(countComb, axis=1)
        return df_result[['id', 'foo1', 'foo2', 'foo3', 'bar1', 'bar2', 'bar3']].groupby('id').sum().reset_index()
    
    
    df \
        .select('id', 'struct.name', 'struct.param1', 'struct.param2') \
        .groupby("id") \
        .apply(myGroupby) \
        .withColumn('foo', F.struct(F.col('foo1').alias('cond1'),
                                    F.col('foo2').alias('cond2'),
                                    F.col('foo3').alias('cond3')
                                    )
                    ) \
        .withColumn('bar', F.struct(F.col('bar1').alias('cond1'),
                                    F.col('bar2').alias('cond2'),
                                    F.col('bar3').alias('cond3')
                                    )
                    ) \
        .select('id', 'foo', 'bar') \
        .show()
    

    那么编写 pyspark 聚合的最佳实践是什么?如果我想在很多列上进行聚合,最好写很多条件(如原始问题)或将聚合划分为更小的部分,然后加入数据帧?

    【讨论】:

      【解决方案2】:

      这个怎么样?

      1. 将 Struc 拆分为单独的列,并使用 window 计算计数。
      from pyspark.sql.types import *
      import pyspark.sql.functions as F
      schema = StructType([StructField('id', StringType()),
                           StructField('struct', StructType(
                               [StructField('name', StringType()),
                                StructField('param1', BooleanType()),
                                StructField('param2', BooleanType()),
                                ]
                           ))])
      data = [['id1', ['foo', True, True]],
              ['id1', ['foo', True, False]],
              ['id1', ['foo', False, False]],
              ['id1', ['bar', True, True]],
              ['id1', ['bar', True, False]],
              ['id1', ['bar', False, False]],
              ['id2', ['foo', True, True]],
              ['id2', ['foo', False, False]],
              ['id2', ['bar', True, True]],
              ['id2', ['bar', False, False]]
              ]
      df = spark.createDataFrame(data, schema)
      df = df.withColumn('name', F.col('struct').getField('name'))
      df = df.withColumn('param1', F.col('struct').getField('param1'))
      df = df.withColumn('param2', F.col('struct').getField('param2'))
      w = Window.partitionBy(['id', 'name'])
      df = df.withColumn('c1', F.count(F.when((df['param1']==True)&(df['param2']==True), 1)).over(w))
      df = df.withColumn('c2', F.count(F.when((df['param1']==True)&(df['param2']==False), 1)).over(w))
      df = df.withColumn('c3', F.count(F.when((df['param1']==False)&(df['param2']==False), 1)).over(w))
      df = df.withColumn('result', F.array(['c1', 'c2', 'c3']))
      df.show()
      
      +---+-------------------+----+------+------+---+---+---+---------+
      | id|             struct|name|param1|param2| c1| c2| c3|   result|
      +---+-------------------+----+------+------+---+---+---+---------+
      |id2|  [bar, true, true]| bar|  true|  true|  1|  0|  1|[1, 0, 1]|
      |id2|[bar, false, false]| bar| false| false|  1|  0|  1|[1, 0, 1]|
      |id1|  [foo, true, true]| foo|  true|  true|  1|  1|  1|[1, 1, 1]|
      |id1|[foo, false, false]| foo| false| false|  1|  1|  1|[1, 1, 1]|
      |id1| [foo, true, false]| foo|  true| false|  1|  1|  1|[1, 1, 1]|
      |id1|  [bar, true, true]| bar|  true|  true|  1|  1|  1|[1, 1, 1]|
      |id1| [bar, true, false]| bar|  true| false|  1|  1|  1|[1, 1, 1]|
      |id1|[bar, false, false]| bar| false| false|  1|  1|  1|[1, 1, 1]|
      |id2|[foo, false, false]| foo| false| false|  1|  0|  1|[1, 0, 1]|
      |id2|  [foo, true, true]| foo|  true|  true|  1|  0|  1|[1, 0, 1]|
      +---+-------------------+----+------+------+---+---+---+---------+
      
      

      然后我们使用pivot:

      df = df.groupby('id').pivot('name').agg(F.first('result'))
      df.show()
      
      +---+---------+---------+
      | id|      bar|      foo|
      +---+---------+---------+
      |id1|[1, 1, 1]|[1, 1, 1]|
      |id2|[1, 0, 1]|[1, 0, 1]|
      +---+---------+---------+
      
      

      【讨论】:

      • 不错的解决方案,代码更少,但似乎它仍然比我的原始解决方案慢。但我只在本地计算机上尝试。我将在集群上的完整数据集上试用它,然后看看。
      猜你喜欢
      • 2014-08-08
      • 1970-01-01
      • 1970-01-01
      • 2019-02-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多