【发布时间】:2019-08-07 19:43:39
【问题描述】:
我目前正在处理一个使用 spark 数据集(Java 语言)的项目,我必须创建一个从累加器派生的新列,该累加器运行在所有先前的行上。
我一直在使用自定义 UserDefinedAggregationFunction 在从 unboundedPreceding 到 currentRow 的窗口上实现此功能。
事情是这样的:
df.withColumn("newColumn", customAccumulator
.apply(columnInputSeq)
.over(customWindowSpec));
但是,出于类型安全的原因和通常更简洁的代码,我真的更喜欢使用类型化的数据集。即:在Dataset<CustomType> 上使用org.apache.spark.sql.expressions.Aggregator 执行相同的操作。这里的问题是我查看了所有文档,但无法弄清楚如何使其以与上述相同的方式运行(即我只能获得整个列的最终聚合,而不是每一行的累积状态) .
我正在尝试做的事情是否可行,如果可以,怎么做?
为清楚起见添加了示例:
初始表:
+-------+------+------+
| Index | Col1 | Col2 |
+-------+------+------+
| 1 | abc | def |
| 2 | ghi | jkl |
| 3 | mno | pqr |
| 4 | stu | vwx |
+-------+------+------+
然后以聚合操作为例: 先将累加器反转,在Col1前追加Col2并返回这个值,同样设置为累加器。
+-------+------+------+--------------------------+
| Index | Col1 | Col2 | Accumulator |
+-------+------+------+--------------------------+
| 1 | abc | def | abcdef |
| 2 | ghi | jkl | ghifedcbajkl |
| 3 | mno | pqr | mnolkjabcdefihgpqr |
| 4 | stu | vwx | sturpqghifedcbajklonmvwx |
+-------+------+------+--------------------------+
使用UserDefinedAggregateFunction 我已经能够生成这个,但是使用Aggregator 我只能得到最后一行。
【问题讨论】:
-
您能否添加一些有关您正在寻找的聚合和一些示例数据集的信息?
-
@ChitralVerma 我添加了一个示例。请不要只建议我可以使用的内置插件,这不是问题的重点。
标签: java apache-spark apache-spark-sql