首先让我们创建我们的测试数据框。
>>> import pandas as pd
>>> data = {
"ID1": [1, 2, 5, 1],
"ID2": [1, 1, 3, 3],
"ID3": [4, 3, 2, 4],
"Visits": [500, 100, 200, 200],
"Investment": [1000, 200, 400, 200]
}
>>> df = spark.createDataFrame(pd.DataFrame(data))
>>> df.show()
+---+---+---+------+----------+
|ID1|ID2|ID3|Visits|Investment|
+---+---+---+------+----------+
| 1| 1| 4| 500| 1000|
| 2| 1| 3| 100| 200|
| 5| 3| 2| 200| 400|
| 1| 3| 4| 200| 200|
+---+---+---+------+----------+
一旦我们有了可以操作的 DataFrame,我们必须定义一个函数,该函数将返回来自列 ID1、ID2 和 ID3 的唯一 ID 列表。
>>> import pyspark.sql.functions as F
>>> from pyspark.sql.types import ArrayType, IntegerType
>>> @F.udf(returnType=ArrayType(IntegerType()))
... def ids_list(*cols):
... return list(set(cols))
现在是时候在 DataFrame 上应用我们的 udf了。
>>> df = df.withColumn('ids', ids_list('ID1', 'ID2', 'ID3'))
>>> df.show()
+---+---+---+------+----------+---------+
|ID1|ID2|ID3|Visits|Investment| ids|
+---+---+---+------+----------+---------+
| 1| 1| 4| 500| 1000| [1, 4]|
| 2| 1| 3| 100| 200|[1, 2, 3]|
| 5| 3| 2| 200| 400|[2, 3, 5]|
| 1| 3| 4| 200| 200|[1, 3, 4]|
+---+---+---+------+----------+---------+
要使用ids 列,我们必须将其分解为单独的行并删除ids 列。
>>> df = df.withColumn("ID", F.explode('ids')).drop('ids')
>>> df.show()
+---+---+---+------+----------+---+
|ID1|ID2|ID3|Visits|Investment| ID|
+---+---+---+------+----------+---+
| 1| 1| 4| 500| 1000| 1|
| 1| 1| 4| 500| 1000| 4|
| 2| 1| 3| 100| 200| 1|
| 2| 1| 3| 100| 200| 2|
| 2| 1| 3| 100| 200| 3|
| 5| 3| 2| 200| 400| 2|
| 5| 3| 2| 200| 400| 3|
| 5| 3| 2| 200| 400| 5|
| 1| 3| 4| 200| 200| 1|
| 1| 3| 4| 200| 200| 3|
| 1| 3| 4| 200| 200| 4|
+---+---+---+------+----------+---+
最后,我们必须按 ID 列对 DataFrame 进行分组并计算总和。最终结果按ID排序。
>>> final_df = (
... df.groupBy('ID')
... .agg( F.sum('Visits'), F.sum('Investment') )
... .orderBy('ID')
... )
>>> final_df.show()
+---+-----------+---------------+
| ID|sum(Visits)|sum(Investment)|
+---+-----------+---------------+
| 1| 800| 1400|
| 2| 300| 600|
| 3| 500| 800|
| 4| 700| 1200|
| 5| 200| 400|
+---+-----------+---------------+
希望对你有用。