【发布时间】:2021-06-30 02:35:17
【问题描述】:
在 Spark v3.0.1 中,我有一个任意模式的 DataFrame。
我想将任意模式的 DataFrame 转换为具有相同模式和新列的新 DataFrame,该新列是对每行中离散存在的数据进行计算的结果。
我可以放心地假设某些类型的某些列可用于逻辑计算,尽管 DataFrame 是任意模式。
我之前通过创建一个包含两列的新 Dataset[outcome] 解决了这个问题:
- 来自输入 DataFrame 的 KEY
- 计算的结果
...然后在初始输入上加入该 DF 以添加新列:
val inputDf = Seq(
("1", "input1", "input2"),
("2", "anotherInput1", "anotherInput2"),
).asDF("key", "logicalInput1", "logicalInput2")
case class outcome(key: String, outcome: String)
val outcomes = inputDf.map(row => {
val input1 = row.getAs[String]("logicalInput1")
val input2 = row.getAs[String]("logicalInput2")
val key = row.getAs[String]("key")
val result = if (input1 != "") input1 + input2 else input2
outcome(key, result)
})
val finalDf = inputDf.join(outcomes, Seq("key"))
是否有一种更有效的方法可以将 DataFrame 映射到新的 DataFrame,在输入 DF 上给定任意列的额外列,我们可以假设存在一些列来进行计算?
我想获取 inputDF 并在每一行上映射,生成该行的副本并向其添加一个带有结果结果的新列,而无需事后加入...
请注意,在上面的示例中,存在使用 Spark API 的简单解决方案...我的计算并不像将字符串连接在一起那么简单,因此解决方案需要 .map 或 udf。如果可能的话,我想避免使用 UDF,尽管这也可以。
【问题讨论】:
标签: scala dataframe apache-spark apache-spark-sql