【问题标题】:Filter array values during aggregation in spark dataframe在火花数据框中聚合期间过滤数组值
【发布时间】:2021-11-19 00:36:40
【问题描述】:

我正在对以下数据框执行聚合以获取具有品牌数组的广告商列表

+------------+------+
|advertiser  |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+

这是我的代码:

import org.apache.spark.sql.functions.collect_list

df2
  .groupBy("advertiser")
  .agg(collect_list("brand").as("brands"))

这给了我以下数据框:

+------------+----------------+
|advertiser  |brands          |
+------------+----------------+
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
+------------+----------------+

在聚合期间,我想使用下表过滤品牌列表:

+------+------------+
|brand |brand name  |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+

为了实现:

+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null    |
+------------+--------+

【问题讨论】:

    标签: scala apache-spark pyspark apache-spark-sql scala-collections


    【解决方案1】:

    对于您的问题,我看到了两个解决方案,我将其称为 Collect SolutionJoin Solution

    收集解决方案

    如果您可以收集brands 数据框,则可以使用此收集的集合在执行collect_list 时仅保留正确的品牌,然后flatten 您的数组并用null 替换空数组,如下所示:

    import org.apache.spark.sql.functions.{array, col, collect_list, flatten, size, when}
    
    val filteredBrands = brands.select("brand").collect().map(_.getString(0))
    
    val finalDataframe = df2
      .groupBy("advertiser")
      .agg(collect_list(when(col("brand").isin(filteredBrands: _*), array(col("brand"))).otherwise(array())).as("brands"))
      .withColumn("brands", flatten(col("brands")))
      .withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
    

    加入解决方案

    如果您的brands 数据框不适合内存,您可以先将df2brands 左连接,如果品牌在brands 数据框中,则有一个包含品牌的列,否则null,然后做你的分组,最后替换空数组,因为没有你想过滤的品牌的广告商null

    import org.apache.spark.sql.functions.{col, collect_list}
    
    val finalDataframe = df2
      .join(brands.select(col("brand").as("filtered_brand")), col("filtered_brand") === col("brand"), "left_outer")
      .groupBy("advertiser").agg(collect_list(col("filtered_brand")).as("brands"))
      .withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))
    

    详情

    所以如果我们从df2 数据框开始如下:

    +------------+------+
    |advertiser  |brand |
    +------------+------+
    |Advertiser 1|Brand1|
    |Advertiser 1|Brand2|
    |Advertiser 2|Brand3|
    |Advertiser 2|Brand4|
    |Advertiser 3|Brand5|
    |Advertiser 3|Brand6|
    +------------+------+
    

    还有一个brands 数据框如下:

    +------+------------+
    |brand |brand name  |
    +------+------------+
    |Brand1|Brand_name_1|
    |Brand3|Brand_name_3|
    +------+------------+
    

    df2brands 数据帧之间的第一个左外连接(第一行)之后,您会得到以下数据帧:

    +------------+------+--------------+
    |advertiser  |brand |filtered_brand|
    +------------+------+--------------+
    |Advertiser 1|Brand1|Brand1        |
    |Advertiser 1|Brand2|null          |
    |Advertiser 2|Brand3|Brand3        |
    |Advertiser 2|Brand4|null          |
    |Advertiser 3|Brand5|null          |
    |Advertiser 3|Brand6|null          |
    +------------+------+--------------+
    

    当您按广告商对该数据框进行分组,收集过滤品牌列表时,您会得到以下数据框:

    +------------+--------+
    |advertiser  |brands  |
    +------------+--------+
    |Advertiser 2|[Brand3]|
    |Advertiser 3|[]      |
    |Advertiser 1|[Brand1]|
    +------------+--------+
    

    最后,当您应用最后一行将空数组替换为 null 时,您会得到预期的结果:

    +------------+--------+
    |advertiser  |brands  |
    +------------+--------+
    |Advertiser 2|[Brand3]|
    |Advertiser 3|null    |
    |Advertiser 1|[Brand1]|
    +------------+--------+
    

    结论

    收集解决方案 仅创建一个昂贵的 suffle 步骤(在 groupBy 期间),如果您的 brands 数据帧很小,则应优先选择。如果您的 brands 数据框很大,Join 解决方案 有效,但它会创建许多昂贵的 suffle 步骤,一个 groupBy 和一个 join。

    【讨论】:

    • 感谢您的回复,我的brands 数据框很大。由于在brands 数据框内没有任何品牌的广告商似乎被过滤掉了,因此加入解决方案不太有效(Advertiser3)。我想保留所有广告商,并且只保留品牌表中的品牌
    • 不,不在brands 数据帧中的广告商仍然存在于最终数据帧中。它们被添加到最后一个连接中:.join(df2.select("advertiser").distinct(), Seq("advertiser"), "right_outer")
    • 嗯,由于某种原因,我无法通过最后一次加入获得该结果。我认为有一种方法可以在 groupByagg 之前以某种方式过滤所有广告商的品牌。
    • 我改进了一点连接解决方​​案,在df2brands 之间只保留一个连接。我还详细介绍了连接解决方​​案的每个中间数据帧,以帮助您更好地了解每个步骤的操作。
    • 我认为这行得通!感谢您的解释。唯一的问题是,withColumn 在我所做的更新中表现不如预期。我不只是拥有collect_list(col("filtered_brand")).as("brands"),而是将它变成一个带有品牌名称的结构,所以它是collect_list(struct(col("filtered_brand") as "brand_id", col("brand_name") as "brand_name")).as("brands")
    猜你喜欢
    • 1970-01-01
    • 2015-11-06
    • 1970-01-01
    • 2021-01-04
    • 2019-02-27
    • 2019-08-23
    • 2017-02-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多