【问题标题】:Union two DataFrame using spark 2.x with different schema/dataTypes使用具有不同架构/数据类型的 spark 2.x 联合两个 DataFrame
【发布时间】:2025-12-07 08:00:02
【问题描述】:

我正在尝试使用 spark 合并多个 hive 表,其中一些具有相同名称的列具有不同的数据类型,尤其是字符串和 bigint。

我的决赛桌 (hiveDF) 应该有如下架构-

+--------------------------+------------+----------+--+
|         col_name         | data_type  | comment  |
+--------------------------+------------+----------+--+
| announcementtype         | bigint     |          |
| approvalstatus           | string     |          |
| capitalrate              | double     |          |
| cash                     | double     |          |
| cashinlieuprice          | double     |          |
| costfactor               | double     |          |
| createdby                | string     |          |
| createddate              | string     |          |
| currencycode             | string     |          |
| declarationdate          | string     |          |
| declarationtype          | bigint     |          |
| divfeerate               | double     |          |
| divonlyrate              | double     |          |
| dividendtype             | string     |          |
| dividendtypeid           | bigint     |          |
| editedby                 | string     |          |
| editeddate               | string     |          |
| exdate                   | string     |          |
| filerecordid             | string     |          |
| frequency                | string     |          |
| grossdivrate             | double     |          |
| id                       | bigint     |          |
| indicatedannualdividend  | string     |          |
| longtermrate             | double     |          |
| netdivrate               | double     |          |
| newname                  | string     |          |
| newsymbol                | string     |          |
| note                     | string     |          |
| oldname                  | string     |          |
| oldsymbol                | string     |          |
| paydate                  | string     |          |
| productid                | bigint     |          |
| qualifiedratedollar      | double     |          |
| qualifiedratepercent     | double     |          |
| recorddate               | string     |          |
| sharefactor              | double     |          |
| shorttermrate            | double     |          |
| specialdivrate           | double     |          |
| splitfactor              | double     |          |
| taxstatuscodeid          | bigint     |          |
| lastmodifieddate         | timestamp  |          |
| active_status            | boolean    |          |
+--------------------------+------------+----------+--+

这个最终表 (hiveDF) 架构可以使用以下 JSON-

{
"id": -2147483647,
"productId": 150816,
"dividendTypeId": 2,
"dividendType": "Dividend/Capital Gain",
"payDate": null,
"exDate": "2009-03-25",
"oldSymbol": "ILAAX",
"newSymbol": "ILAAX",
"oldName": "",
"newName": "",
"grossDivRate": 0.115,
"shortTermRate": 0,
"longTermRate": 0,
"splitFactor": 0,
"shareFactor": 0,
"costFactor": 0,
"cashInLieuPrice": 0,
"cash": 0,
"note": "0",
"createdBy": "Yahoo",
"createdDate": "2009-08-03T06:44:19.677-05:00",
"editedBy": "Yahoo",
"editedDate": "2009-08-03T06:44:19.677-05:00",
"netDivRate": null,
"divFeeRate": null,
"specialDivRate": null,
"approvalStatus": null,
"capitalRate": null,
"qualifiedRateDollar": null,
"qualifiedRatePercent": null,
"declarationDate": null,
"declarationType": null,
"currencyCode": null,
"taxStatusCodeId": null,
"announcementType": null,
"frequency": null,
"recordDate": null,
"divOnlyRate": 0.115,
"fileRecordID": null,
"indicatedAnnualDividend": null
}

我正在做类似下面的事情-

var hiveDF = spark.sqlContext.sql("select * from final_destination_tableName")
var newDataDF = spark.sqlContext.sql("select * from incremental_table_1 where id > 866000")

我的增量表 (newDataDF) 有一些具有不同数据类型的列。我有大约 10 个增量表,其中某处 bigint 和其他表中的字符串相同,因此无法确定我是否进行类型转换。类型转换可能很容易,但我不确定我应该使用哪种类型,因为那里有多个表。我正在寻找没有类型转换我可以做的任何方法。

例如增量表如下所示-

+--------------------------+------------+----------+--+
|         col_name         | data_type  | comment  |
+--------------------------+------------+----------+--+
| announcementtype         | string     |          |
| approvalstatus           | string     |          |
| capitalrate              | string     |          |
| cash                     | double     |          |
| cashinlieuprice          | double     |          |
| costfactor               | double     |          |
| createdby                | string     |          |
| createddate              | string     |          |
| currencycode             | string     |          |
| declarationdate          | string     |          |
| declarationtype          | string     |          |
| divfeerate               | string     |          |
| divonlyrate              | double     |          |
| dividendtype             | string     |          |
| dividendtypeid           | bigint     |          |
| editedby                 | string     |          |
| editeddate               | string     |          |
| exdate                   | string     |          |
| filerecordid             | string     |          |
| frequency                | string     |          |
| grossdivrate             | double     |          |
| id                       | bigint     |          |
| indicatedannualdividend  | string     |          |
| longtermrate             | double     |          |
| netdivrate               | string     |          |
| newname                  | string     |          |
| newsymbol                | string     |          |
| note                     | string     |          |
| oldname                  | string     |          |
| oldsymbol                | string     |          |
| paydate                  | string     |          |
| productid                | bigint     |          |
| qualifiedratedollar      | string     |          |
| qualifiedratepercent     | string     |          |
| recorddate               | string     |          |
| sharefactor              | double     |          |
| shorttermrate            | double     |          |
| specialdivrate           | string     |          |
| splitfactor              | double     |          |
| taxstatuscodeid          | string     |          |
| lastmodifieddate         | timestamp  |          |
| active_status            | boolean    |          |
+--------------------------+------------+----------+--+

我正在为类似下面的表格做这个联合-

var combinedDF = hiveDF.unionAll(newDataDF)

但没有运气。我试图给出如下的最终模式,但没有运气-

val rows = newDataDF.rdd
val newDataDF2 = spark.sqlContext.createDataFrame(rows, hiveDF.schema)
var combinedDF = hiveDF.unionAll(newDataDF2)
combinedDF.coalesce(1).write.mode(SaveMode.Overwrite).option("orc.compress", "snappy").orc("/apps/hive/warehouse/" + database + "/" + tableLower + "_temp")

根据this,我尝试了以下-

var combinedDF = sparkSession.read.json(hiveDF.toJSON.union(newDataDF.toJSON).rdd)

最后我想像上面那样写表但没有运气,请帮帮我-

【问题讨论】:

  • 您遇到的错误是什么
  • @ChandanRay java.lang.ClassCastException: org.apache.hadoop.io.Text 无法转换为 org.apache.hadoop.io.LongWritable

标签: scala apache-spark hive apache-spark-sql


【解决方案1】:

在将增量表与现有表合并时,我也遇到过这种情况。一般有2种情况需要处理

1.带有额外列的增量数据:

这可以通过您在此处尝试的正常合并过程来解决。

2。具有相同列名但不同架构的增量数据:

这是一个棘手的问题。一种简单的解决方案是将机器人数据转换为 toJSON 并进行联合 hiveDF.toJSON.union(newDataDF.toJSON)。然而,这将导致 json 模式合并,并将更改现有模式。例如:如果表中的列a:Long和增量表中的a:String,合并后的最终模式将是:字符串。如果你想做 json union 是没有办法回避的。

对此的替代方法是对增量数据进行严格的架构检查。你测试增量表和hive表的schema是否相同,如果schema不同就不要合并。

然而,这有点过于严格,因为对于实时数据,很难实施模式。

所以我解决这个问题的方法是在合并之前有一个单独的浓缩过程。该过程实际上会检查架构,如果传入的列可以升级/降级到当前的配置单元表架构,它会这样做。

本质上,它遍历传入的增量,为每一行将其转换为正确的模式。这花费不高,但为数据的正确性提供了很好的保证。如果该过程无法转换一行。我将这一行放在一边并发出警报,以便可以手动验证数据是否存在生成数据的上游系统中的任何错误。

这是code 我用来验证两个模式是否可合并。

【讨论】: