【发布时间】:2025-12-07 08:00:02
【问题描述】:
我正在尝试使用 spark 合并多个 hive 表,其中一些具有相同名称的列具有不同的数据类型,尤其是字符串和 bigint。
我的决赛桌 (hiveDF) 应该有如下架构-
+--------------------------+------------+----------+--+
| col_name | data_type | comment |
+--------------------------+------------+----------+--+
| announcementtype | bigint | |
| approvalstatus | string | |
| capitalrate | double | |
| cash | double | |
| cashinlieuprice | double | |
| costfactor | double | |
| createdby | string | |
| createddate | string | |
| currencycode | string | |
| declarationdate | string | |
| declarationtype | bigint | |
| divfeerate | double | |
| divonlyrate | double | |
| dividendtype | string | |
| dividendtypeid | bigint | |
| editedby | string | |
| editeddate | string | |
| exdate | string | |
| filerecordid | string | |
| frequency | string | |
| grossdivrate | double | |
| id | bigint | |
| indicatedannualdividend | string | |
| longtermrate | double | |
| netdivrate | double | |
| newname | string | |
| newsymbol | string | |
| note | string | |
| oldname | string | |
| oldsymbol | string | |
| paydate | string | |
| productid | bigint | |
| qualifiedratedollar | double | |
| qualifiedratepercent | double | |
| recorddate | string | |
| sharefactor | double | |
| shorttermrate | double | |
| specialdivrate | double | |
| splitfactor | double | |
| taxstatuscodeid | bigint | |
| lastmodifieddate | timestamp | |
| active_status | boolean | |
+--------------------------+------------+----------+--+
这个最终表 (hiveDF) 架构可以使用以下 JSON-
{
"id": -2147483647,
"productId": 150816,
"dividendTypeId": 2,
"dividendType": "Dividend/Capital Gain",
"payDate": null,
"exDate": "2009-03-25",
"oldSymbol": "ILAAX",
"newSymbol": "ILAAX",
"oldName": "",
"newName": "",
"grossDivRate": 0.115,
"shortTermRate": 0,
"longTermRate": 0,
"splitFactor": 0,
"shareFactor": 0,
"costFactor": 0,
"cashInLieuPrice": 0,
"cash": 0,
"note": "0",
"createdBy": "Yahoo",
"createdDate": "2009-08-03T06:44:19.677-05:00",
"editedBy": "Yahoo",
"editedDate": "2009-08-03T06:44:19.677-05:00",
"netDivRate": null,
"divFeeRate": null,
"specialDivRate": null,
"approvalStatus": null,
"capitalRate": null,
"qualifiedRateDollar": null,
"qualifiedRatePercent": null,
"declarationDate": null,
"declarationType": null,
"currencyCode": null,
"taxStatusCodeId": null,
"announcementType": null,
"frequency": null,
"recordDate": null,
"divOnlyRate": 0.115,
"fileRecordID": null,
"indicatedAnnualDividend": null
}
我正在做类似下面的事情-
var hiveDF = spark.sqlContext.sql("select * from final_destination_tableName")
var newDataDF = spark.sqlContext.sql("select * from incremental_table_1 where id > 866000")
我的增量表 (newDataDF) 有一些具有不同数据类型的列。我有大约 10 个增量表,其中某处 bigint 和其他表中的字符串相同,因此无法确定我是否进行类型转换。类型转换可能很容易,但我不确定我应该使用哪种类型,因为那里有多个表。我正在寻找没有类型转换我可以做的任何方法。
例如增量表如下所示-
+--------------------------+------------+----------+--+
| col_name | data_type | comment |
+--------------------------+------------+----------+--+
| announcementtype | string | |
| approvalstatus | string | |
| capitalrate | string | |
| cash | double | |
| cashinlieuprice | double | |
| costfactor | double | |
| createdby | string | |
| createddate | string | |
| currencycode | string | |
| declarationdate | string | |
| declarationtype | string | |
| divfeerate | string | |
| divonlyrate | double | |
| dividendtype | string | |
| dividendtypeid | bigint | |
| editedby | string | |
| editeddate | string | |
| exdate | string | |
| filerecordid | string | |
| frequency | string | |
| grossdivrate | double | |
| id | bigint | |
| indicatedannualdividend | string | |
| longtermrate | double | |
| netdivrate | string | |
| newname | string | |
| newsymbol | string | |
| note | string | |
| oldname | string | |
| oldsymbol | string | |
| paydate | string | |
| productid | bigint | |
| qualifiedratedollar | string | |
| qualifiedratepercent | string | |
| recorddate | string | |
| sharefactor | double | |
| shorttermrate | double | |
| specialdivrate | string | |
| splitfactor | double | |
| taxstatuscodeid | string | |
| lastmodifieddate | timestamp | |
| active_status | boolean | |
+--------------------------+------------+----------+--+
我正在为类似下面的表格做这个联合-
var combinedDF = hiveDF.unionAll(newDataDF)
但没有运气。我试图给出如下的最终模式,但没有运气-
val rows = newDataDF.rdd
val newDataDF2 = spark.sqlContext.createDataFrame(rows, hiveDF.schema)
var combinedDF = hiveDF.unionAll(newDataDF2)
combinedDF.coalesce(1).write.mode(SaveMode.Overwrite).option("orc.compress", "snappy").orc("/apps/hive/warehouse/" + database + "/" + tableLower + "_temp")
根据this,我尝试了以下-
var combinedDF = sparkSession.read.json(hiveDF.toJSON.union(newDataDF.toJSON).rdd)
最后我想像上面那样写表但没有运气,请帮帮我-
【问题讨论】:
-
您遇到的错误是什么
-
@ChandanRay java.lang.ClassCastException: org.apache.hadoop.io.Text 无法转换为 org.apache.hadoop.io.LongWritable
标签: scala apache-spark hive apache-spark-sql