【问题标题】:PySpark aggregation and complex schemaPySpark 聚合和复杂模式
【发布时间】:2018-05-17 01:52:42
【问题描述】:

我有一个像这样的 Spark 数据框 (df1):

deviceid   host      count 
a.b.c.d   0.0.0.0     1
a.b.c.d   1.1.1.1     3
x.y.z     0.0.0.0     2

我想把它转换成这样的新数据框

deviceid   hosts_counts   
a.b.c.d    [(0.0.0.0,1),(1.1.1.1,3)]
x.y.z      [(0.0.0.0,2)]

我试过的是这样的:

def convertTuple(*data): 
    for k,v in data: 
        return k[0], (k[1],v)  

df2 = df1.map(convertTuple) # zip host and count 

然后:

function countReducer(a,b): 
    return a + b
df3 = df2.reduceByKey(countReducer)

但是,这给了我一个这样的数据框,我不知道下一步如何实现我的最终目标:

编辑

我设法使用groupbycollect_list 来解决这个问题。棘手的部分是为了在(host,count) 元组上进行聚合,您需要创建一个strcut。这是代码:

df = df1.groupby("deviceid").agg(collect_list(struct("domain","count")).alias("domain_count"))

【问题讨论】:

    标签: python apache-spark pyspark rdd


    【解决方案1】:

    问题是你将元组连接在一起,countReducer 不会给你一个元组列表。在 Python 中:

    (1,2) + (3,4) = (1,2,3,4)
    

    您可以做的是将元组转换为元组列表(带有单个元素)。可以使用map

    .map(lambda x: (x[0], [ x[1] ]))
    

    但在这种情况下,最好更改 convertTuple 函数以返回您想要的内容:

    def convertTuple(*data): 
        for k,v in data: 
            return k[0], [(k[1],v)]
    

    附带说明一下,您似乎使用的是 RDD 而不是数据帧。如果您不使用旧的 Spark 版本,我建议您考虑更改为数据帧,因为它们更容易使用。

    【讨论】:

    • 感谢您对转换部分的建议。就 RDD/dataframe 而言,我确实使用了 dataframe。我实际上设法使用 collect_list 来实现相同的目标。我会用我的解决方案更新我的帖子。
    猜你喜欢
    • 2019-05-30
    • 1970-01-01
    • 2019-11-04
    • 1970-01-01
    • 2012-06-09
    • 2019-03-14
    • 2014-10-21
    • 1970-01-01
    • 2021-01-20
    相关资源
    最近更新 更多