【发布时间】:2018-04-02 07:18:42
【问题描述】:
我使用 Spark 2.2 版本和 Scala 作为编程语言。
输入数据:
{"amount":"2.00","cal_group":[{}],"set_id":7057}
{"amount":"1.00","cal_group":[{}],"set_id":7057}
{"amount":"7.00","cal_group": [{"abc_cd":"abc00160","abc_cnt":6.0,"cde_cnt":7.0},{"abc_cd":"abc00160","abc_cnt":5.0,"cde_cnt":2.0},{"abc_cd":"abc00249","abc_cnt":0.0,"cde_cnt":1.0}],"set_id":7057}
输入数据框:
[2.00,WrappedArray([null,null,null]),7057]
[1.00,WrappedArray([null,null,null]),7057]
[7.00,WrappedArray([abc00160,6.0,7.0],[abc00160,5.0,2.0,],[abc00249,0.0,1.0]),7057]
输入数据架构:
|-- amount: string (nullable = true)
|-- cal_group: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- abc_cd: string (nullable = true)
| | |-- abc_cnt: double (nullable = true)
| | |-- cde_cnt: double (nullable = true)
|--set_id: double
注意:每个包装数组都是一个结构,其中包含 abc_cd 和其他一些 2 个度量列。
我想对输入数据进行两级聚合。它被称为第 1 步和第 2 步。
第 1 步:
我们需要获取每个 set_id 的金额总和,并在为 cal_group 执行 collect_list 时删除空值
我试过下面的代码:
val res1=res.groupBy($"set_id").agg(sum($"amount").as('amount_total),collect_list(struct($"cal_group")).as('finalgroup))
它按预期给了我 amount 的总和。 但是这里我不知道如何跳过 null WrappedArray cal_group 列。
输出:步骤 1
[7057,10.00,WrappedArray([WrappedArray([null,null,null])],[WrappedArray([null,null,null])],[WrappedArray([null,null,null])],[WrappedArray([abc00160,6.0,7.0],[abc00160,5.0,2.0],[abc00249,0.0,1.0])])
第 2 步:
然后我想在 abc_cd 代码级别聚合 2 个度量(abc_cnt, cde_cnt)。
这里可以通过对 cal_group 列的explode 函数来完成此聚合。它将在行级别转换 cal_group 记录,它将增加行/数据量。
所以,我尝试分解结构并在 abc_cd 上进行分组。
使用explode函数求和的示例代码:
val res2 = res1.select($"set_id",$"amount_total",explode($"cal_group").as("cal_group"))
val res1 = res2.select($"set_id",$"amount_total",$"cal_group")
.groupBy($"set_id",$"cal_group.abc_cd")
.agg(sum($"cal_group.abc_cnt").as('abc_cnt_sum),
sum($"cal_group.cde_cnt").as('cde_cnt_sum),
)
所以在这里,我不想爆炸 col_group 列。 因为它正在增加音量。
第 2 步后的预期输出:
[7057,10.00,WrappedArray(**[WrappedArray([null,null,null])],
[WrappedArray([null,null,null])],
[WrappedArray([null,null,null])],
[WrappedArray([abc00160,11.0,9.0],
[abc00249,0.0,1.0])])
是否有任何可用的选项,该函数应在记录级别聚合并在收集之前删除空结构。
提前致谢。
【问题讨论】:
-
您可以编写一个 UDF 来过滤空结构。但我怀疑这会比explode + filter更有效。
-
@RameshMaharjan,阅读后将变为空。我正在从 HDFS 读取数据作为 parquet 文件。
-
您要删除结构中的 abc_cd == null 还是全部三个?
-
我想从结构中删除三个空值。
标签: scala apache-spark apache-spark-sql spark-dataframe