如前所述,我们可以在 scala 中使用 Key -> Value 对,但不能使用 Python 的类似表示
首先,读取数据
scala> val df = Seq((1111111111,"user1-316","2019-10-1114:01:49",1,"others"), (1111111111,"user1","2019-10-1114:25:35",2,"mobile"), (1111111111,"user2","2019-10-1114:30:05",3,"others"), (1111111112,"user2","2019-10-1114:16:58",4,"others"), (1111111113,"user2","2019-10-1114:32:00",1,"mobile")).toDF("cid","uid","date","rating","type")
df: org.apache.spark.sql.DataFrame = [cid: int, uid: string ... 3 more fields]
scala> df.show
+----------+---------+------------------+------+------+
| cid| uid| date|rating| type|
+----------+---------+------------------+------+------+
|1111111111|user1-316|2019-10-1114:01:49| 1|others|
|1111111111| user1|2019-10-1114:25:35| 2|mobile|
|1111111111| user2|2019-10-1114:30:05| 3|others|
|1111111112| user2|2019-10-1114:16:58| 4|others|
|1111111113| user2|2019-10-1114:32:00| 1|mobile|
+----------+---------+------------------+------+------+
现在,我们将 cid,rating,date 转换为 list
scala> val df1 = df.groupBy($"uid", $"type").agg(collect_list(array($"cid", $"rating", $"date")).as("aggNew"))
df1: org.apache.spark.sql.DataFrame = [uid: string, type: string ... 1 more field]
scala> df1.show(false)
+---------+------+--------------------------------------------------------------------------------------------------+
|uid |type |aggNew |
+---------+------+--------------------------------------------------------------------------------------------------+
|user1 |mobile|[WrappedArray(1111111111, 2, 2019-10-1114:25:35)] |
|user2 |mobile|[WrappedArray(1111111113, 1, 2019-10-1114:32:00)] |
|user1-316|others|[WrappedArray(1111111111, 1, 2019-10-1114:01:49)] |
|user2 |others|[WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)]|
+---------+------+--------------------------------------------------------------------------------------------------+
最重要的是,在uid上应用groupBy以获得所需的(key, values)
scala> df1.groupBy($"uid").agg(collect_list(map($"type", $"aggNew"))).show(false)
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|uid |collect_list(map(type, aggNew)) |
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user1-316|[Map(others -> WrappedArray(WrappedArray(1111111111, 1, 2019-10-1114:01:49)))] |
|user1 |[Map(mobile -> WrappedArray(WrappedArray(1111111111, 2, 2019-10-1114:25:35)))] |
|user2 |[Map(mobile -> WrappedArray(WrappedArray(1111111113, 1, 2019-10-1114:32:00))), Map(others -> WrappedArray(WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)))]|
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
添加架构
root
|-- uid: string (nullable = true)
|-- collect_list(map(type, aggNew)): array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: array (valueContainsNull = true)
| | | |-- element: array (containsNull = true)
| | | | |-- element: string (containsNull = true)