【问题标题】:Convert Group By to Reduce by Key将 Group By 转换为 Reduce by Key
【发布时间】:2018-02-21 23:35:17
【问题描述】:

我有一个庞大的 pyspark 数据框。我必须执行一个小组,但是我遇到了严重的性能问题。我需要优化代码,所以我一直在读到 Reduce by Key 更有效。

这是数据框的示例。

a = [('Bob', 562,"Food", "12 May 2018"), ('Bob',880,"Food","01 June 2018"), ('Bob',380,'Household'," 16 June 2018"),  ('Sue',85,'Household'," 16 July 2018"), ('Sue',963,'Household'," 16 Sept 2018")]
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])

输出:

+------+------+---------+-------------+
|Person|Amount|   Budget|         Date|
+------+------+---------+-------------+
|   Bob|   562|     Food|  12 May 2018|
|   Bob|   880|     Food| 01 June 2018|
|   Bob|   380|Household| 16 June 2018|
|   Sue|    85|Household| 16 July 2018|
|   Sue|   963|Household| 16 Sept 2018|
+------+------+---------+-------------+

我已经实现了以下代码,但是如前所述,实际的数据帧是巨大的。

df_grouped = df.groupby('person').agg(F.collect_list(F.struct("Amount", "Budget", "Date")).alias("data"))

输出:

+------+--------------------------------------------------------------------------------+
|person|data                                                                            |
+------+--------------------------------------------------------------------------------+
|Sue   |[[85,Household, 16 July 2018], [963,Household, 16 Sept 2018]]                   |
|Bob   |[[562,Food,12 May 2018], [880,Food,01 June 2018], [380,Household, 16 June 2018]]|
+------+--------------------------------------------------------------------------------+

架构为:

root
 |-- person: string (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Amount: long (nullable = true)
 |    |    |-- Budget: string (nullable = true)
 |    |    |-- Date: string (nullable = true)

我需要将 group by 转换为 reduce by key,以便我可以创建与上面相同的架构。

【问题讨论】:

标签: python group-by pyspark spark-dataframe


【解决方案1】:

这个怎么样,

def flatten(l, ltypes=(tuple)):
    ltype = type(l)
    l = list(l)
    i = 0
    while i < len(l):
        while isinstance(l[i], ltypes):
            if not l[i]:
                l.pop(i)
                i -= 1
                break
            else:
                l[i:i + 1] = l[i]
        i += 1
    return ltype(l)

def nested_change(item, func):
    if isinstance(item, list):
        return [nested_change(x, func) for x in item]
    return func(item)


def convert(*args):
    return args

df_final = df.rdd.map(lambda x: ((x['Person']),([x[y] for y in cols if y != 'Person']))).reduceByKey(convert)\
                 .map(lambda x:(x[0],nested_change(list(flatten(x[1])),str)))\
                 .toDF(['person','data'])

df_final.show()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-10-24
    • 2023-03-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-02-20
    • 1970-01-01
    相关资源
    最近更新 更多