【发布时间】:2019-02-12 23:19:53
【问题描述】:
我正在尝试使用 Scala 和 Spark 将数据从 GP 移动到 HDFS。
val execQuery = "select * from schema.tablename"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 19919927).option("upperBound", 28684058).option("numPartitions",30).load()
val yearDFSchema = yearDF.schema
yearDF 的架构是:
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: decimal(38,30) (nullable = true)
|-- release_number: decimal(38,30) (nullable = true)
|-- change_number: decimal(38,30) (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: decimal(15,0) (nullable = true)
|-- history_enabled_flag: string (nullable = true)
我们项目给出的hive上同一张表的架构:
val hiveColumns = source_system_name:String|description:String|creation_date:Timestamp|status:String|status_date:Timestamp|table_refresh_delay_min:Timestamp|release_number:Double|change_number:Double|interface_queue_enabled_flag:String|rework_enabled_flag:String|fdm_application_id:Bigint|history_enabled_flag:String
所以我采用 hiveColumns 并创建了一个新的 StructType,如下所示:
def convertDatatype(datatype: String): DataType = {
val convert = datatype match {
case "string" => StringType
case "bigint" => LongType
case "int" => IntegerType
case "double" => DoubleType
case "date" => TimestampType
case "boolean" => BooleanType
case "timestamp" => TimestampType
}
convert
}
val schemaList = hiveColumns.split("\\|")
val newSchema = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
newSchema.printTreeString()
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: double (nullable = true)
|-- release_number: double (nullable = true)
|-- change_number: double (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: long (nullable = true)
|-- history_enabled_flag: string (nullable = true)
当我尝试在 yearDF 上应用我的新架构:schemaStructType 时,如下所示,出现异常:
Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
由于将十进制转换为双精度而发生异常。
我不明白的是如何将列的数据类型:table_refresh_delay_min, release_number, change_number, fdm_application_id in the StructType: newSchema from DoubleType 转换为 yearDF 架构中存在的相应数据类型。即
如果yearDFSchema 中的列具有精度大于零的十进制数据类型,在本例中为十进制(38,30),我需要将newSchema 中相同列的数据类型转换为DecimalType(38,30)
谁能告诉我如何实现它?
【问题讨论】:
标签: scala apache-spark apache-spark-sql