【问题标题】:How to change the datatype of a column in StructField of a StructType?如何更改 StructType 的 StructField 中列的数据类型?
【发布时间】:2019-06-22 16:44:21
【问题描述】:

我正在尝试更改从 RDBMS 数据库读取的数据框中存在的列的数据类型。 为此,我通过以下方式获得了数据框的架构:

val dataSchema = dataDF.schema

为了查看数据框的架构,我使用了以下语句:

println(dataSchema.schema)

Output: StructType(StructField(je_header_id,LongType,true), StructField(je_line_num,LongType,true), StructField(last_update_date,TimestampType,true), StructField(last_updated_by,DecimalType(15,0),true), StructField(creation_date,TimestampType,true), StructField(created_by,DecimalType(15,0),true), StructField(created_by_name,StringType,true), StructField(entered_dr,DecimalType(38,30),true), StructField(entered_cr,DecimalType(38,30),true))

我的要求是找到 DecimalType 并将其从上述架构更改为 DoubleType。 我可以使用以下方法获取列名和数据类型:dataSchema.dtype 但它以((columnName1, column datatype),(columnName2, column datatype)....(columnNameN, column datatype)) 的格式为我提供了数据类型

我试图找到一种方法来解析 StructType 并徒劳地更改 dataSchema 中的架构。

谁能告诉我是否有办法解析 StructType 以便我可以将数据类型更改为我的要求并获得以下格式

StructType(StructField(je_header_id,LongType,true), StructField(je_line_num,LongType,true), StructField(last_update_date,TimestampType,true), StructField(last_updated_by,DoubleType,true), StructField(creation_date,TimestampType,true), StructField(created_by,DoubleType,true), StructField(created_by_name,StringType,true), StructField(entered_dr,DoubleType,true), StructField(entered_cr,DoubleType,true))

【问题讨论】:

  • 映射架构并返回列名或带有强制转换的列名,然后您可以将其用作选择表达式来强制执行新架构

标签: scala apache-spark apache-spark-sql


【解决方案1】:

要修改特定于给定数据类型的 DataFrame Schema,您可以与 StructFielddataType 进行模式匹配,如下所示:

import org.apache.spark.sql.types._

val df = Seq(
  (1L, BigDecimal(12.34), "a", BigDecimal(10.001)),
  (2L, BigDecimal(56.78), "b", BigDecimal(20.002))
).toDF("c1", "c2", "c3", "c4")

val newSchema = df.schema.fields.map{
  case StructField(name, _: DecimalType, nullable, _)
    => StructField(name, DoubleType, nullable)
  case field => field
}
// newSchema: Array[org.apache.spark.sql.types.StructField] = Array(
//   StructField(c1,LongType,false), StructField(c2,DoubleType,true),
//   StructField(c3,StringType,true), StructField(c4,DoubleType,true)
// )

但是,假设您的最终目标是通过更改列类型来转换数据集,则只需遍历目标数据类型的列以迭代 cast 它们会更容易,如下所示:

import org.apache.spark.sql.functions._

val df2 = df.dtypes.
  collect{ case (dn, dt) if dt.startsWith("DecimalType") => dn }.
  foldLeft(df)((accDF, c) => accDF.withColumn(c, col(c).cast("Double")))

df2.printSchema
// root
//  |-- c1: long (nullable = false)
//  |-- c2: double (nullable = true)
//  |-- c3: string (nullable = true)
//  |-- c4: double (nullable = true)

[更新]

根据 cmets 的附加要求,如果您只想更改具有正比例的 DecimalType 的架构,只需在方法 guard 中应用正则表达式模式匹配作为 guard 条件@:

val pattern = """DecimalType\(\d+,(\d+)\)""".r

val df2 = df.dtypes.
  collect{ case (dn, dt) if pattern.findFirstMatchIn(dt).map(_.group(1)).getOrElse("0") != "0" => dn }.
  foldLeft(df)((accDF, c) => accDF.withColumn(c, col(c).cast("Double")))

【讨论】:

  • 如果列在struct内,你如何改变它
  • @stack0114106,显然现有的解决方案仅用于转换目标数据类型的顶级列,这对于典型的 RDBMS 表(这是 OP 的数据源)来说应该足够了。为了处理任意嵌套的列结构,我会考虑使用类似于SO answer 的递归遍历。
  • 太棒了!.. 非常感谢递归解决方案。
  • @Metadata,因为foldLeft 保持遍历元素的顺序并且withColumn(c, col(c)...) 为新列重用相同的名称,所以列的顺序应该保持不变。在任何情况下,如果出于某种原因更改了列的顺序,您始终可以使用显式的select(例如df2.select(df.columns.map(col): _*))强制执行它。
  • @Metadata,请查看扩展答案。
【解决方案2】:

这是另一种方式:

data.show(false)
data.printSchema

+----+------------------------+----+----------------------+
|col1|col2                    |col3|col4                  |
+----+------------------------+----+----------------------+
|1   |0.003200000000000000    |a   |23.320000000000000000 |
|2   |78787.990030000000000000|c   |343.320000000000000000|
+----+------------------------+----+----------------------+

root
 |-- col1: integer (nullable = false)
 |-- col2: decimal(38,18) (nullable = true)
 |-- col3: string (nullable = true)
 |-- col4: decimal(38,18) (nullable = true) 

创建您想要的架构:
示例:

val newSchema = StructType(
  Seq(
    StructField("col1", StringType, true),
    StructField("col2", DoubleType, true),
    StructField("col3", StringType, true),
    StructField("col4", DoubleType, true)
  )
)

将列转换为所需的数据类型。

val newDF = data.selectExpr(newSchema.map(
   col => s"CAST ( ${col.name} As ${col.dataType.sql}) ${col.name}"
  ): _*)

newDF.printSchema

root
 |-- col1: string (nullable = false)
 |-- col2: double (nullable = true)
 |-- col3: string (nullable = true)
 |-- col4: double (nullable = true) 

newDF.show(false)
+----+-----------+----+------+
|col1|col2       |col3|col4  |
+----+-----------+----+------+
|1   |0.0032     |a   |23.32 |
|2   |78787.99003|c   |343.32|
+----+-----------+----+------+

【讨论】:

  • 我尝试了你的建议,结果是:使用者:java.lang.IllegalArgumentException:要求失败:小数精度 39 超过最大精度 38 你能告诉我这里可以做什么吗?
【解决方案3】:

公认的解决方案效果很好,但由于 withColumn 的成本很高,因此成本非常高,并且分析器必须为每个 withColumn 分析整个 DF,并且对于大量列,成本非常高。我宁愿建议这样做 -

val transformedColumns = inputDataDF.dtypes
      .collect {
        case (dn, dt)
            if (dt.startsWith("DecimalType")) =>
          (dn, DoubleType)
      }

    val transformedDF = inputDataDF.select(transformedColumns.map { fieldType =>
      inputDataDF(fieldType._1).cast(fieldType._2)
    }: _*)

对于一个非常小的数据集,在我的机器上使用 withColumn 方法需要 1 分钟以上,而使用 select 方法需要 100 毫秒。

您可以在此处阅读有关 withColumn 成本的更多信息 - https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015

【讨论】:

    猜你喜欢
    • 2019-12-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-11-24
    • 2020-07-27
    • 1970-01-01
    • 2019-04-06
    • 2012-02-20
    相关资源
    最近更新 更多