【问题标题】:How do you change schema in a Spark `DataFrame.map()` operation without joins?如何在没有连接的情况下更改 Spark `DataFrame.map()` 操作中的架构?
【发布时间】:2021-06-30 02:35:17
【问题描述】:

在 Spark v3.0.1 中,我有一个任意模式的 DataFrame。

我想将任意模式的 DataFrame 转换为具有相同模式和新列的新 DataFrame,该新列是对每行中离散存在的数据进行计算的结果。

我可以放心地假设某些类型的某些列可用于逻辑计算,尽管 DataFrame 是任意模式。

我之前通过创建一个包含两列的新 Dataset[outcome] 解决了这个问题:

  • 来自输入 DataFrame 的 KEY
  • 计算的结果

...然后在初始输入上加入该 DF 以添加新列:

val inputDf = Seq(
  ("1", "input1", "input2"),
  ("2", "anotherInput1", "anotherInput2"),
).asDF("key", "logicalInput1", "logicalInput2")

case class outcome(key: String, outcome: String)

val outcomes = inputDf.map(row => {
  val input1 = row.getAs[String]("logicalInput1")
  val input2 = row.getAs[String]("logicalInput2")
  val key = row.getAs[String]("key")

  val result = if (input1 != "") input1 + input2 else input2
  outcome(key, result)
})

val finalDf = inputDf.join(outcomes, Seq("key"))

是否有一种更有效的方法可以将 DataFrame 映射到新的 DataFrame,在输入 DF 上给定任意列的额外列,我们可以假设存在一些列来进行计算?

我想获取 inputDF 并在每一行上映射,生成该行的副本并向其添加一个带有结果结果的新列,而无需事后加入...

请注意,在上面的示例中,存在使用 Spark API 的简单解决方案...我的计算并不像将字符串连接在一起那么简单,因此解决方案需要 .map 或 udf。如果可能的话,我想避免使用 UDF,尽管这也可以。

【问题讨论】:

    标签: scala dataframe apache-spark apache-spark-sql


    【解决方案1】:

    在回答有关使用.map 的确切问题之前,我认为值得简要讨论一下为此目的使用UDF。问题的“注释”中提到了 UDF,但没有详细说明。

    当我们在任何 Dataset [1] 上使用 .map(或 .filter.flatMap 和任何其他高阶函数)时,我们会强制 Spark 将整个行完全反序列化为一个对象,将带有函数的对象,然后再次序列化整个对象。这是非常昂贵的。

    UDF 实际上是围绕 Scala 函数的包装器,它将值从某些列路由到 UDF 的参数。因此,Spark 知道 UDF 需要哪些列,哪些不需要,因此我们通过忽略 UDF 不使用的列来节省大量序列化(可能还有 IO)成本。

    此外,查询优化器对.map 并没有真正的帮助,但UDF 可以成为优化器将(理论上)最小化执行成本的更大计划的一部分。

    我相信在问题中提出的那种情况下,UDF 通常会更好。另一个表明 UDF 是一个很好的解决方案的味道是,与其他解决方案相比,它需要的代码很少。

    val outcome = udf { (input1: String, input2: String) =>
        if (input1 != "") input1 + input2 else input2
    }  
    
    inputDf.withColumn("outcome", outcome(col("logicalInput1"), col("logicalInput2")))
    

    现在回答关于使用.map的问题!为了避免连接,我们需要将.map 的结果设为Row,其中包含输入行的所有内容并添加了输出。 Row 实际上是一个类型为Any 的值序列。 Spark 通过使用数据集中的模式信息以类型安全的方式操作这些值。如果我们使用新架构创建新的Row,并为.map 提供新架构的Encoder,Spark 将知道如何为我们创建新的DataFrame

    val newSchema = inputDf.schema.add("outcome", StringType)
    val newEncoder = RowEncoder(newSchema)
    
    inputDf
      .map { row =>
        val rowWithSchema = row.asInstanceOf[GenericRowWithSchema] // This cast might not always be possible!
        
        val input1 = row.getAs[String]("logicalInput1")
        val input2 = row.getAs[String]("logicalInput2")
        val key = row.getAs[String]("key")
    
        val result = if (input1 != "") input1 + input2 else input2
        
        new GenericRowWithSchema(rowWithSchema.toSeq.toArray :+ result, row.schema).asInstanceOf[Row] // Encoder is invariant so we have to cast again.
      }(newEncoder)
      .show()
    
    

    不像 UDF 那样优雅,但它在这种情况下有效。但是,我不确定这个解决方案是否通用。


    [1] DataFrame 只是Dataset[Row] 的别名

    【讨论】:

    • 使用通用行的很好的解释和解决方案,谢谢。
    【解决方案2】:

    您应该使用withColumnUDF。我不明白为什么应该首选 map,而且我认为在 DataFrame API 中附加一列非常困难

    或者你切换到数据集 API

    【讨论】:

    • 感谢您的回复。我想在这种情况下,由于传入数据帧中的所有数据都可用,因此使用 UDF 就可以了...我知道 spark 在围绕 UDF 的查询计划方面存在问题,但是此 UDF 仅对直接数据帧中的数据进行操作,所以应该是高性能的,是吗?
    猜你喜欢
    • 2019-10-22
    • 1970-01-01
    • 2013-05-26
    • 1970-01-01
    • 2012-05-26
    • 2021-10-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多