【问题标题】:How Aggregate value without explode in Spark using scala language如何使用 Scala 语言在 Spark 中聚合值而不爆炸
【发布时间】:2018-04-02 07:18:42
【问题描述】:

我使用 Spark 2.2 版本和 Scala 作为编程语言。

输入数据:

{"amount":"2.00","cal_group":[{}],"set_id":7057} {"amount":"1.00","cal_group":[{}],"set_id":7057} {"amount":"7.00","cal_group": [{"abc_cd":"abc00160","abc_cnt":6.0,"cde_cnt":7.0},{"abc_cd":"abc00160","abc_cnt":5.0,"cde_cnt":2.0},{"abc_cd":"abc00249","abc_cnt":0.0,"cde_cnt":1.0}],"set_id":7057}

输入数据框:

[2.00,WrappedArray([null,null,null]),7057]
[1.00,WrappedArray([null,null,null]),7057]
[7.00,WrappedArray([abc00160,6.0,7.0],[abc00160,5.0,2.0,],[abc00249,0.0,1.0]),7057]

输入数据架构:

|-- amount: string (nullable = true)
|-- cal_group: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- abc_cd: string (nullable = true)
|    |    |-- abc_cnt: double (nullable = true)
|    |    |-- cde_cnt: double (nullable = true)
|--set_id: double

注意:每个包装数组都是一个结构,其中包含 abc_cd 和其他一些 2 个度量列。

我想对输入数据进行两级聚合。它被称为第 1 步和第 2 步。

第 1 步:

我们需要获取每个 set_id 的金额总和,并在为 cal_group 执行 collect_list 时删除空值

我试过下面的代码:

val res1=res.groupBy($"set_id").agg(sum($"amount").as('amount_total),collect_list(struct($"cal_group")).as('finalgroup))

它按预期给了我 amount 的总和。 但是这里我不知道如何跳过 null WrappedArray cal_group 列。

输出:步骤 1

[7057,10.00,WrappedArray([WrappedArray([null,null,null])],[WrappedArray([null,null,null])],[WrappedArray([null,null,null])],[WrappedArray([abc00160,6.0,7.0],[abc00160,5.0,2.0],[abc00249,0.0,1.0])])

第 2 步:

然后我想在 abc_cd 代码级别聚合 2 个度量(abc_cnt, cde_cnt)。

这里可以通过对 cal_group 列的explode 函数来完成此聚合。它将在行级别转换 cal_group 记录,它将增加行/数据量。

所以,我尝试分解结构并在 abc_cd 上进行分组。

使用explode函数求和的示例代码:

   val res2 = res1.select($"set_id",$"amount_total",explode($"cal_group").as("cal_group"))
    val res1 = res2.select($"set_id",$"amount_total",$"cal_group")
                         .groupBy($"set_id",$"cal_group.abc_cd")
                         .agg(sum($"cal_group.abc_cnt").as('abc_cnt_sum),
                              sum($"cal_group.cde_cnt").as('cde_cnt_sum),
                              )

所以在这里,我不想爆炸 col_group 列。 因为它正在增加音量。

第 2 步后的预期输出:

[7057,10.00,WrappedArray(**[WrappedArray([null,null,null])],
                                       [WrappedArray([null,null,null])],
                                       [WrappedArray([null,null,null])],
                                       [WrappedArray([abc00160,11.0,9.0],
                                                     [abc00249,0.0,1.0])])

是否有任何可用的选项,该函数应在记录级别聚合并在收集之前删除空结构。

提前致谢。

【问题讨论】:

  • 您可以编写一个 UDF 来过滤空结构。但我怀疑这会比explode + filter更有效。
  • @RameshMaharjan,阅读后将变为空。我正在从 HDFS 读取数据作为 parquet 文件。
  • 您要删除结构中的 abc_cd == null 还是全部三个?
  • 我想从结构中删除三个空值。

标签: scala apache-spark apache-spark-sql spark-dataframe


【解决方案1】:

你可以为第二部分聚合定义一个udf函数

import org.apache.spark.sql.functions._
def aggregateUdf = udf((nestedArray: Seq[Seq[Row]])=>
  nestedArray
    .flatMap(x => x
      .map(y => (y(0).asInstanceOf[String], (y(1).asInstanceOf[Double], y(2).asInstanceOf[Double]))))
      .filterNot(_._1 == null)
      .groupBy(_._1)
      .map(x => (x._1, x._2.map(_._2._1).sum, x._2.map(_._2._2).sum)).toArray
)

您可以在第一次聚合后调用udf 函数(也需要通过删除结构部分进行修改

val finalRes=res
  .groupBy($"set_id")
  .agg(sum($"amount").as('amount_total),collect_list($"cal_group").as('finalgroup))
  .withColumn("finalgroup", aggregateUdf('finalgroup))

所以finalRes 将是

+------+------------+-----------------------------------------+
|set_id|amount_total|finalgroup                               |
+------+------------+-----------------------------------------+
|7057  |10.0        |[[abc00249,0.0,1.0], [abc00160,11.0,9.0]]|
+------+------------+-----------------------------------------+

【讨论】:

    【解决方案2】:

    我获取了以下 json 数据并加载以获得与您相同的架构:

    {"amount":"2.00","cal_group":[{}],"set_id":7057.0}
    {"amount":"1.00","cal_group":[{}],"set_id":7057}
    {"amount":"7.00","cal_group": [{"abc_cd":"abc00160","abc_cnt":6.0,"cde_cnt":7.0},{"abc_cd":"abc00160","abc_cnt":5.0,"cde_cnt":2.0},{"abc_cd":"abc00249","abc_cnt":0.0,"cde_cnt":1.0}],"set_id":7057}
    

    但是这里我不知道如何跳过 null WrappedArray cal_group 列

    我认为 collect_list 会自动删除 null,但在您的情况下它不能,因为您使用了 struct 进行不需要的聚合。因此,Step 1 的正确转换是:

    val res1=res.groupBy($"set_id").agg(sum($"amount").as('amount_total),(collect_list($"cal_group")).as('finalgroup))
    

    给出以下输出(showprintSchema

    +------+------------+--------------------------------------------------------------------------+
    |set_id|amount_total|finalgroup                                                                |
    +------+------------+--------------------------------------------------------------------------+
    |7057.0|10.0        |[WrappedArray([abc00160,6.0,7.0], [abc00160,5.0,2.0], [abc00249,0.0,1.0])]|
    +------+------------+--------------------------------------------------------------------------+
    root
     |-- set_id: double (nullable = true)
     |-- amount_total: double (nullable = true)
     |-- finalgroup: array (nullable = true)
     |    |-- element: array (containsNull = true)
     |    |    |-- element: struct (containsNull = true)
     |    |    |    |-- abc_cd: string (nullable = true)
     |    |    |    |-- abc_cnt: double (nullable = true)
     |    |    |    |-- cde_cnt: double (nullable = true)
    

    第 2 步

    下面假设上面的代码作为第 1 步运行。我只使用了爆炸机制。

    要处理你的数据结构,你必须做两次explode,因为amountcal_group 分组的结构是一个双重嵌套数组。下面是给出所需o/p的代码:

    val res2 = res1.select($"set_id",$"amount_total",explode($"finalgroup").as("cal_group"))
    val res3 = res2.select($"set_id",$"amount_total",explode($"cal_group").as("cal_group_exp"))
    val res4 = res3.groupBy($"set_id",$"cal_group_exp.abc_cd")
                              .agg(sum($"cal_group_exp.abc_cnt").as('abc_cnt_sum),
                                  sum($"cal_group_exp.cde_cnt").as('cde_cnt_sum))
    res4.show(false)
    

    带输出:

    +------+--------+-----------+-----------+
    |set_id|  abc_cd|abc_cnt_sum|cde_cnt_sum|
    +------+--------+-----------+-----------+
    |7057.0|abc00160|       11.0|        9.0|
    |7057.0|abc00249|        0.0|        1.0|
    +------+--------+-----------+-----------+
    

    【讨论】:

    • 感谢@Sujit,我们尝试了类似的方法,但我们在输入结构中有空值,因此即使我们使用 collect_list 也无法删除空值
    • 我已经编辑了我的问题并添加了输入数据。您的输入数据缺少第一两行的 cal_group。
    • 你只需要中间的一个过滤器@Vijay_Shinde,你应该没问题
    猜你喜欢
    • 2022-01-16
    • 1970-01-01
    • 2019-07-24
    • 2021-11-15
    • 1970-01-01
    • 2018-10-05
    • 2016-11-09
    • 2019-04-04
    • 2017-08-08
    相关资源
    最近更新 更多