【问题标题】:How do I run a spark sql aggregator cumulatively?如何累积运行 spark sql 聚合器?
【发布时间】:2019-08-07 19:43:39
【问题描述】:

我目前正在处理一个使用 spark 数据集(Java 语言)的项目,我必须创建一个从累加器派生的新列,该累加器运行在所有先前的行上。

我一直在使用自定义 UserDefinedAggregationFunction 在从 unboundedPrecedingcurrentRow 的窗口上实现此功能。

事情是这样的:

df.withColumn("newColumn", customAccumulator
    .apply(columnInputSeq)
    .over(customWindowSpec));

但是,出于类型安全的原因和通常更简洁的代码,我真的更喜欢使用类型化的数据集。即:在Dataset<CustomType> 上使用org.apache.spark.sql.expressions.Aggregator 执行相同的操作。这里的问题是我查看了所有文档,但无法弄清楚如何使其以与上述相同的方式运行(即我只能获得整个列的最终聚合,而不是每一行的累积状态) .

我正在尝试做的事情是否可行,如果可以,怎么做?

为清楚起见添加了示例:

初始表:

+-------+------+------+
| Index | Col1 | Col2 |
+-------+------+------+
|     1 | abc  | def  |
|     2 | ghi  | jkl  |
|     3 | mno  | pqr  |
|     4 | stu  | vwx  |
+-------+------+------+

然后以聚合操作为例: 先将累加器反转,在Col1前追加Col2并返回这个值,同样设置为累加器。

+-------+------+------+--------------------------+
| Index | Col1 | Col2 |       Accumulator        |
+-------+------+------+--------------------------+
|     1 | abc  | def  | abcdef                   |
|     2 | ghi  | jkl  | ghifedcbajkl             |
|     3 | mno  | pqr  | mnolkjabcdefihgpqr       |
|     4 | stu  | vwx  | sturpqghifedcbajklonmvwx |
+-------+------+------+--------------------------+

使用UserDefinedAggregateFunction 我已经能够生成这个,但是使用Aggregator 我只能得到最后一行。

【问题讨论】:

  • 您能否添加一些有关您正在寻找的聚合和一些示例数据集的信息?
  • @ChitralVerma 我添加了一个示例。请不要只建议我可以使用的内置插件,这不是问题的重点。

标签: java apache-spark apache-spark-sql


【解决方案1】:

你没有

我的消息来源是一位朋友,他一直在研究与此相同的问题,现在得出结论这是不可能的

【讨论】:

  • 真可惜。您能否也提供一个来源,最好是相关文档的链接?
  • 不是文档本身,但希望这会有所帮助:latlmes.com/tech/top-10-java-spark-sql-tricks-1我可以向我的朋友询问更多细节(尽管我确实质疑他的计算机科学证书)
猜你喜欢
  • 1970-01-01
  • 2019-03-25
  • 2016-07-28
  • 1970-01-01
  • 2022-01-13
  • 1970-01-01
  • 2022-10-25
  • 1970-01-01
  • 2016-05-12
相关资源
最近更新 更多