【问题标题】:convert Spark dataframe to scala dictionary like format将 Spark 数据帧转换为类似格式的 scala 字典
【发布时间】:2019-10-18 09:05:41
【问题描述】:

我有一个 spark 数据框,我需要将其转换为 (key, value) 对。以下是格式:

+--------------------+--------------------+-------------------+------+------+
|                 cid|                 uid|               date|rating|  type|
+--------------------+--------------------+-------------------+------+------+
|          1111111111|           user1-316|2019-10-11 14:01:49|     1|others|
|          1111111111|               user1|2019-10-11 14:25:35|     2|mobile|
|          1111111111|               user2|2019-10-11 14:30:05|     3|others|
|          1111111112|               user2|2019-10-11 14:16:58|     4|others|
|          1111111113|               user2|2019-10-11 14:32:00|     1|mobile|
+--------------------+--------------------+-------------------+------+------+

我需要基于uid 聚合它并为每个type 创建一个cid,rating,date 列表

 uid       | history
-----------+--------------------------------------------------------
 user1-316 | {"others": [["1111111111", 1, "2019-10-11 14:01:49"]]}
 user1     | {"mobile": [["1111111111", 2, "2019-10-11 14:25:35"]]}
 user2     | {"others": [["1111111111", 3, "2019-10-11 14:30:05"],["1111111112", 4, "2019-10-11 14:16:58"]],"mobile":[["1111111113", 1, "2019-10-11 14:32:00"]]}

在 python 中我可以实现这一点,因为我们有dict 格式。我们如何在 scala 中完成这项工作。

【问题讨论】:

    标签: scala apache-spark dictionary apache-spark-sql


    【解决方案1】:

    更新答案:

    你可以试试这样的。我不确定 Python 中的 dict 但对于 (key,value),Scala 有 map 类型。

    
    scala> df.show
    +----------+---------+-------------------+------+------+
    |       cid|      uid|               date|rating|  type|
    +----------+---------+-------------------+------+------+
    |1111111111|user1-316|2019-10-11 14:01:49|     1|others|
    |1111111111|    user1|2019-10-11 14:25:35|     2|mobile|
    |1111111111|    user2|2019-10-11 14:30:05|     3|others|
    |1111111112|    user2|2019-10-11 14:16:58|     4|others|
    |1111111113|    user2|2019-10-11 14:32:00|     1|mobile|
    +----------+---------+-------------------+------+------+
    
    scala> df.withColumn("col1",array("cid","rating","date"))
             .groupBy("type","uid")
             .agg(map(col("type"),collect_list("col1")).as("col2")) 
             .groupBy("uid")
             .agg(collect_list(col("col2")).as("history"))
             .show(false)
    
    +---------+----------------------------------------------------------------------------------------------------------------------------------------------+
    |uid      |history                                                                                                                                       |
    +---------+----------------------------------------------------------------------------------------------------------------------------------------------+
    |user1-316|[[others -> [[1111111111, 1, 2019-10-11 14:01:49]]]]                                                                                          |
    |user1    |[[mobile -> [[1111111111, 2, 2019-10-11 14:25:35]]]]                                                                                          |
    |user2    |[[others -> [[1111111111, 3, 2019-10-11 14:30:05], [1111111112, 4, 2019-10-11 14:16:58]]], [mobile -> [[1111111113, 1, 2019-10-11 14:32:00]]]]|
    +---------+----------------------------------------------------------------------------------------------------------------------------------------------+
    
    
    

    【讨论】:

    • 我的错....没有正确看到问题。我已经更新了我的答案,但您的最后一个问题仍未解决(在列表之外有 othersmobile 作为字典/地图)。
    【解决方案2】:

    如前所述,我们可以在 scala 中使用 Key -> Value 对,但不能使用 Python 的类似表示

    首先,读取数据

    scala> val df = Seq((1111111111,"user1-316","2019-10-1114:01:49",1,"others"), (1111111111,"user1","2019-10-1114:25:35",2,"mobile"), (1111111111,"user2","2019-10-1114:30:05",3,"others"), (1111111112,"user2","2019-10-1114:16:58",4,"others"), (1111111113,"user2","2019-10-1114:32:00",1,"mobile")).toDF("cid","uid","date","rating","type")
    df: org.apache.spark.sql.DataFrame = [cid: int, uid: string ... 3 more fields]
    scala> df.show
    +----------+---------+------------------+------+------+
    |       cid|      uid|              date|rating|  type|
    +----------+---------+------------------+------+------+
    |1111111111|user1-316|2019-10-1114:01:49|     1|others|
    |1111111111|    user1|2019-10-1114:25:35|     2|mobile|
    |1111111111|    user2|2019-10-1114:30:05|     3|others|
    |1111111112|    user2|2019-10-1114:16:58|     4|others|
    |1111111113|    user2|2019-10-1114:32:00|     1|mobile|
    +----------+---------+------------------+------+------+
    

    现在,我们将 cid,rating,date 转换为 list

    scala> val df1 = df.groupBy($"uid", $"type").agg(collect_list(array($"cid", $"rating", $"date")).as("aggNew"))
    df1: org.apache.spark.sql.DataFrame = [uid: string, type: string ... 1 more field]
    
    scala> df1.show(false)
    +---------+------+--------------------------------------------------------------------------------------------------+
    |uid      |type  |aggNew                                                                                            |
    +---------+------+--------------------------------------------------------------------------------------------------+
    |user1    |mobile|[WrappedArray(1111111111, 2, 2019-10-1114:25:35)]                                                 |
    |user2    |mobile|[WrappedArray(1111111113, 1, 2019-10-1114:32:00)]                                                 |
    |user1-316|others|[WrappedArray(1111111111, 1, 2019-10-1114:01:49)]                                                 |
    |user2    |others|[WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)]|
    +---------+------+--------------------------------------------------------------------------------------------------+
    

    最重要的是,在uid上应用groupBy以获得所需的(key, values)

    scala> df1.groupBy($"uid").agg(collect_list(map($"type", $"aggNew"))).show(false)
    +---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |uid      |collect_list(map(type, aggNew))                                                                                                                                                                              |
    +---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |user1-316|[Map(others -> WrappedArray(WrappedArray(1111111111, 1, 2019-10-1114:01:49)))]                                                                                                                               |
    |user1    |[Map(mobile -> WrappedArray(WrappedArray(1111111111, 2, 2019-10-1114:25:35)))]                                                                                                                               |
    |user2    |[Map(mobile -> WrappedArray(WrappedArray(1111111113, 1, 2019-10-1114:32:00))), Map(others -> WrappedArray(WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)))]|
    +---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    添加架构

    root
     |-- uid: string (nullable = true)
     |-- collect_list(map(type, aggNew)): array (nullable = true)
     |    |-- element: map (containsNull = true)
     |    |    |-- key: string
     |    |    |-- value: array (valueContainsNull = true)
     |    |    |    |-- element: array (containsNull = true)
     |    |    |    |    |-- element: string (containsNull = true)
    

    【讨论】:

    • 这给了我们一个列表列表,但需要mobileothers 不是在list 里面,而是在dict 里面是可能的
    • 添加了架构来支持我的观点。 Dict 是 python 的键值对集合对象,而 Map 是 Scala 的键值对集合对象。唯一的区别是名称和代表。您可以在最后一个操作中看到“map”关键字(我接受每次都使用 collect_list 很奇怪,但是 spark 需要它来执行。没有它尝试过但它会抛出错误。我们可以在没有 groupBy 的情况下单独使用“map”,但是在您的情况下, groupBy 是必须的,并且键值对应该在列表中,这只是 Spark 方式)。但我确信我的结果不是列表列表:)
    猜你喜欢
    • 2021-01-01
    • 2018-07-16
    • 1970-01-01
    • 2018-12-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多