【问题标题】:How to compare two structypes in Scala and change datatype of columns in Scala?如何比较 Scala 中的两种结构类型并更改 Scala 中列的数据类型?
【发布时间】:2019-02-12 23:19:53
【问题描述】:

我正在尝试使用 Scala 和 Spark 将数据从 GP 移动到 HDFS。

val execQuery    = "select * from schema.tablename"
val yearDF       = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 19919927).option("upperBound", 28684058).option("numPartitions",30).load()
val yearDFSchema = yearDF.schema

yearDF 的架构是:

root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: decimal(38,30) (nullable = true)
 |-- release_number: decimal(38,30) (nullable = true)
 |-- change_number: decimal(38,30) (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: decimal(15,0) (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

我们项目给出的hive上同一张表的架构:

val hiveColumns = source_system_name:String|description:String|creation_date:Timestamp|status:String|status_date:Timestamp|table_refresh_delay_min:Timestamp|release_number:Double|change_number:Double|interface_queue_enabled_flag:String|rework_enabled_flag:String|fdm_application_id:Bigint|history_enabled_flag:String

所以我采用 hiveColumns 并创建了一个新的 StructType,如下所示:

def convertDatatype(datatype: String): DataType = {
  val convert = datatype match {
    case "string"     => StringType
    case "bigint"     => LongType
    case "int"        => IntegerType
    case "double"     => DoubleType
    case "date"       => TimestampType
    case "boolean"    => BooleanType
    case "timestamp"  => TimestampType
  }
  convert
}


val schemaList = hiveColumns.split("\\|")
val newSchema  = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
newSchema.printTreeString()
root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: double (nullable = true)
 |-- release_number: double (nullable = true)
 |-- change_number: double (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: long (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

当我尝试在 yearDF 上应用我的新架构:schemaStructType 时,如下所示,出现异常:

 Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double

由于将十进制转换为双精度而发生异常。 我不明白的是如何将列的数据类型:table_refresh_delay_min, release_number, change_number, fdm_application_id in the StructType: newSchema from DoubleType 转换为 yearDF 架构中存在的相应数据类型。即

如果yearDFSchema 中的列具有精度大于零的十进制数据类型,在本例中为十进制(38,30),我需要将newSchema 中相同列的数据类型转换为DecimalType(38,30)

谁能告诉我如何实现它?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    当您尝试使用 开发人员的 API 函数在 RDD[Row] 上应用架构时,会发生此类错误:

    def createDataFrame(rows: List[Row], schema: StructType): DataFrame
    def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
    def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
    

    在这种情况下,存储的数据类型必须匹配外部(即 Scala 中的值类型)数据类型as listed in the official SQL,并且不应用类型转换或强制。

    因此,作为用户,您有责任确保日期和架构兼容。

    您提供的问题描述表明了完全不同的情况,它要求CAST。让我们使用与您的示例完全相同的架构创建数据集:

    val yearDF = spark.createDataFrame(
      sc.parallelize(Seq[Row]()),
      StructType(Seq(
        StructField("source_system_name", StringType),
        StructField("table_refresh_delay_min", DecimalType(38, 30)),
        StructField("release_number", DecimalType(38, 30)),
        StructField("change_number", DecimalType(38, 30)),
        StructField("interface_queue_enabled_flag", StringType),
        StructField("rework_enabled_flag", StringType),
        StructField("fdm_application_id", DecimalType(15, 0)),
        StructField("history_enabled_flag", StringType)
    )))
    
    yearDF.printSchema
    
    root
     |-- source_system_name: string (nullable = true)
     |-- table_refresh_delay_min: decimal(38,30) (nullable = true)
     |-- release_number: decimal(38,30) (nullable = true)
     |-- change_number: decimal(38,30) (nullable = true)
     |-- interface_queue_enabled_flag: string (nullable = true)
     |-- rework_enabled_flag: string (nullable = true)
     |-- fdm_application_id: decimal(15,0) (nullable = true)
     |-- history_enabled_flag: string (nullable = true)
    

    和所需的类型,如

    val dtypes = Seq(
      "source_system_name" -> "string",
      "table_refresh_delay_min" -> "double",
      "release_number" -> "double",
      "change_number" -> "double",
      "interface_queue_enabled_flag" -> "string",
      "rework_enabled_flag" -> "string",
      "fdm_application_id" -> "long",
      "history_enabled_flag" -> "string"
    )
    

    那么你就可以映射了:

    val mapping = dtypes.toMap
    
    yearDF.select(yearDF.columns.map { c => col(c).cast(mapping(c)) }: _*).printSchema
    
    root
     |-- source_system_name: string (nullable = true)
     |-- table_refresh_delay_min: double (nullable = true)
     |-- release_number: double (nullable = true)
     |-- change_number: double (nullable = true)
     |-- interface_queue_enabled_flag: string (nullable = true)
     |-- rework_enabled_flag: string (nullable = true)
     |-- fdm_application_id: long (nullable = true)
     |-- history_enabled_flag: string (nullable = true)
    

    这当然假设实际和期望的类型是兼容的,并且CAST is allowed

    如果由于特定 JDBC 驱动程序的特殊性仍然遇到问题,您应该考虑直接在查询中放置强制转换,或者手动 (In Apache Spark 2.0.0, is it possible to fetch a query from an external database (rather than grab the whole table)?)

    val externalDtypes = Seq(
      "source_system_name" -> "text",
      "table_refresh_delay_min" -> "double precision",
      "release_number" -> "float8",
      "change_number" -> "float8",
      "interface_queue_enabled_flag" -> "string",
      "rework_enabled_flag" -> "string",
      "fdm_application_id" -> "bigint",
      "history_enabled_flag" -> "string"
    )
    
    val externalDtypes = dtypes.map { 
      case (c, t) => s"CAST(`$c` AS $t)" 
    } .mkString(", ")
    
    val dbTable = s"""(select $fields from schema.tablename) as tmp"""
    

    或通过自定义架构:

    spark.read
      .format("jdbc")
      .option(
        "customSchema",
        dtypes.map { case (c, t) => s"`$c` $t" } .mkString(", "))
      ...
      .load()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-11-16
      • 1970-01-01
      • 2021-07-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-23
      • 1970-01-01
      相关资源
      最近更新 更多