【问题标题】:Cumulate arrays from earlier rows (PySpark dataframe)从较早的行累积数组(PySpark 数据框)
【发布时间】:2017-01-05 02:31:52
【问题描述】:

一个(Python)示例将使我的问题清楚。假设我有一个 Spark 数据框,其中包含在特定日期观看特定电影的人,如下所示:

movierecord = spark.createDataFrame([("Alice", 1, ["Avatar"]),("Bob", 2, ["Fargo", "Tron"]),("Alice", 4, ["Babe"]), ("Alice", 6, ["Avatar", "Airplane"]), ("Alice", 7, ["Pulp Fiction"]), ("Bob", 9, ["Star Wars"])],["name","unixdate","movies"])

上述定义的模式和数据框如下所示:

root
 |-- name: string (nullable = true)
 |-- unixdate: long (nullable = true)
 |-- movies: array (nullable = true)
 |    |-- element: string (containsNull = true)

+-----+--------+------------------+
|name |unixdate|movies            |
+-----+--------+------------------+
|Alice|1       |[Avatar]          |
|Bob  |2       |[Fargo, Tron]     |
|Alice|4       |[Babe]            |
|Alice|6       |[Avatar, Airplane]|
|Alice|7       |[Pulp Fiction]    |
|Bob  |9       |[Star Wars]       |
+-----+--------+------------------+

我想从上面生成一个新的数据框列,其中包含每个用户看过的所有以前电影,没有重复(“以前”每unixdate 字段)。所以它应该是这样的:

+-----+--------+------------------+------------------------+
|name |unixdate|movies            |previous_movies         |
+-----+--------+------------------+------------------------+
|Alice|1       |[Avatar]          |[]                      |
|Bob  |2       |[Fargo, Tron]     |[]                      |
|Alice|4       |[Babe]            |[Avatar]                |
|Alice|6       |[Avatar, Airplane]|[Avatar, Babe]          |
|Alice|7       |[Pulp Fiction]    |[Avatar, Babe, Airplane]|
|Bob  |9       |[Star Wars]       |[Fargo, Tron]           |
+-----+--------+------------------+------------------------+

如何以一种非常有效的方式实现这一点?

【问题讨论】:

    标签: apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    仅限 SQL 不保留对象的顺序

    • 必需的进口:

      import pyspark.sql.functions as f
      from pyspark.sql.window import Window
      
    • 窗口定义:

      w = Window.partitionBy("name").orderBy("unixdate")
      
    • 完整的解决方案:

      (movierecord
          # Flatten movies
          .withColumn("previous_movie", f.explode("movies"))
          # Collect unique
          .withColumn("previous_movies", f.collect_set("previous_movie").over(w))
          # Drop duplicates for a single unixdate
          .groupBy("name", "unixdate")
          .agg(f.max(f.struct(
              f.size("previous_movies"),
              f.col("movies").alias("movies"),
              f.col("previous_movies").alias("previous_movies")
          )).alias("tmp"))
          # Shift by one and extract
         .select(
             "name", "unixdate", "tmp.movies", 
             f.lag("tmp.previous_movies", 1).over(w).alias("previous_movies")))
      
    • 结果:

       +-----+--------+------------------+------------------------+
       |name |unixdate|movies            |previous_movies         |
       +-----+--------+------------------+------------------------+
       |Bob  |2       |[Fargo, Tron]     |null                    |
       |Bob  |9       |[Star Wars]       |[Fargo, Tron]           |
       |Alice|1       |[Avatar]          |null                    |
       |Alice|4       |[Babe]            |[Avatar]                |
       |Alice|6       |[Avatar, Airplane]|[Babe, Avatar]          |
       |Alice|7       |[Pulp Fiction]    |[Babe, Airplane, Avatar]|
       +-----+--------+------------------+------------------------+
      

    SQL 和 Python UDF 保留顺序:

    • 进口:

      import pyspark.sql.functions as f
      from pyspark.sql.window import Window
      from pyspark.sql import Column
      from pyspark.sql.types import ArrayType, StringType
      
      from typing import List, Union
      
      # https://github.com/pytoolz/toolz
      from toolz import unique, concat, compose
      
    • UDF:

      def flatten_distinct(col: Union[Column, str]) -> Column:
          def flatten_distinct_(xss: Union[List[List[str]], None]) -> List[str]:
              return compose(list, unique, concat)(xss or [])
          return f.udf(flatten_distinct_, ArrayType(StringType()))(col)
      
    • 窗口定义和以前一样。

    • 完整的解决方案:

      (movierecord
          # Collect lists
          .withColumn("previous_movies", f.collect_list("movies").over(w))
          # Flatten and drop duplicates
          .withColumn("previous_movies", flatten_distinct("previous_movies"))
          # Shift by one
          .withColumn("previous_movies", f.lag("previous_movies", 1).over(w))
          # For presentation only
          .orderBy("unixdate")) 
      
    • 结果:

      +-----+--------+------------------+------------------------+
      |name |unixdate|movies            |previous_movies         |
      +-----+--------+------------------+------------------------+
      |Alice|1       |[Avatar]          |null                    |
      |Bob  |2       |[Fargo, Tron]     |null                    |
      |Alice|4       |[Babe]            |[Avatar]                |
      |Alice|6       |[Avatar, Airplane]|[Avatar, Babe]          |
      |Alice|7       |[Pulp Fiction]    |[Avatar, Babe, Airplane]|
      |Bob  |9       |[Star Wars]       |[Fargo, Tron]           |
      +-----+--------+------------------+------------------------+
      

    性能

    鉴于限制,我相信没有有效的方法来解决这个问题。不仅请求的输出需要大量的数据重复(数据被二进制编码以适应 Tungsten 格式,因此您可以获得可能的压缩但对象身份松散),而且在 Spark 计算模型中,包括昂贵的分组和排序在内的一些昂贵的操作。

    如果previous_movies 的预期大小是有界的并且很小但一般来说不可行,这应该没问题。

    通过为用户保留单一、懒惰的历史记录,很容易解决数据重复问题。不是可以在 SQL 中完成的事情,但通过低级别的 RDD 操作非常容易。

    爆炸和collect_ 模式很昂贵。如果您的要求很严格,但又想提高性能,您可以使用 Scala UDF 代替 Python。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多