【发布时间】:2019-08-26 09:12:07
【问题描述】:
我正在尝试聚合 pyspark 数据框。示例如下所示:
+---+-------------------+
| id| struct|
+---+-------------------+
|id1| [foo, true, true]|
|id1| [foo, true, false]|
|id1|[foo, false, false]|
|id1| [bar, true, true]|
|id1| [bar, true, false]|
|id1|[bar, false, false]|
|id2| [foo, true, true]|
|id2|[foo, false, false]|
|id2| [bar, true, true]|
|id2|[bar, false, false]|
+---+-------------------+
ID 列最多有 1500 个唯一 ID,struct.name 将有 5 个唯一值。
这是我的代码,它计算我想要的:
from pyspark.sql.types import *
from shared.spark import start_spark
import pyspark.sql.functions as F
spark = start_spark('app')
schema = StructType([StructField('id', StringType()),
StructField('struct', StructType(
[StructField('name', StringType()),
StructField('param1', BooleanType()),
StructField('param2', BooleanType()),
]
))])
data = [['id1', ['foo', True, True]],
['id1', ['foo', True, False]],
['id1', ['foo', False, False]],
['id1', ['bar', True, True]],
['id1', ['bar', True, False]],
['id1', ['bar', False, False]],
['id2', ['foo', True, True]],
['id2', ['foo', False, False]],
['id2', ['bar', True, True]],
['id2', ['bar', False, False]]
]
df = spark.createDataFrame(data, schema)
df.groupby('id')\
.agg(F.count(F.when((df['struct.name']=='foo') &
(df['struct.param1']) &
(df['struct.param2']), 1)).alias('foo_cond1'),
F.count(F.when((df['struct.name'] == 'foo') &
(df['struct.param1']) &
(df['struct.param2']==False), 1)).alias('foo_cond2'),
F.count(F.when((df['struct.name'] == 'foo') &
(df['struct.param1']==False) &
(df['struct.param2']==False), 1)).alias('foo_cond3'),
F.count(F.when((df['struct.name']=='bar') &
(df['struct.param1']) &
(df['struct.param2']), 1)).alias('bar_cond1'),
F.count(F.when((df['struct.name'] == 'bar') &
(df['struct.param1']) &
(df['struct.param2']==False), 1)).alias('bar_cond2'),
F.count(F.when((df['struct.name'] == 'bar') &
(df['struct.param1']==False) &
(df['struct.param2']==False), 1)).alias('bar_cond3'),
) \
.withColumn('foo', F.struct(F.col('foo_cond1').alias('cond1'),
F.col('foo_cond2').alias('cond2'),
F.col('foo_cond3').alias('cond3')
)
) \
.withColumn('bar', F.struct(F.col('bar_cond1').alias('cond1'),
F.col('bar_cond2').alias('cond2'),
F.col('bar_cond3').alias('cond3')
)
) \
.select('id', 'foo', 'bar') \
.show()
结果如下:
+---+---------+---------+
| id| foo| bar|
+---+---------+---------+
|id1|[1, 1, 1]|[1, 1, 1]|
|id2|[1, 0, 1]|[1, 0, 1]|
+---+---------+---------+
有没有更好的方法来进行这种聚合,用更少的代码执行得更好?也许使用熊猫UDAF?欣赏每一条评论。谢谢
【问题讨论】:
标签: pyspark