【发布时间】:2021-09-08 09:34:29
【问题描述】:
我正在使用 DeltaLake API 使用下面的代码更新表中的行
DeltaTable.forPath(sparkSession, cleanDataPath)
.as("target")
.merge(df.as("source"), "target.desk_mirror_name = source.desk_mirror_name AND target.price = source.price AND target.valuationdate = source.valuationdate AND target.valuationversion = source.valuationversion")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute();
这应该匹配源表和目标表之间的所有列,除了列valuationtag
合并前目标表如下
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317|210611170317|
| Sample|967.93| 2021-06-10| 210611170317|210611170317|
| Sample| 500.0| 2021-06-10| 210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
源表(应该更新目标表)如下
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317| OFFICIAL|
| Sample| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| Sample|967.93| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
只有valuationtag 更改为官方。有了这个,更新的表是
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317| OFFICIAL|
| Sample| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| Sample|967.93| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
到目前为止一切顺利。
当列(在两个表中)包含 null 值时,问题就开始了。假设目标表中的列desk_mirror_name更改为空
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| null|499.97| 2021-06-10| 210611170317|210611170317|
| null|967.93| 2021-06-10| 210611170317|210611170317|
| null| 500.0| 2021-06-10| 210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
对于具有完全相同数据的源表,除了 valuationtag 被更改为 OFFICIAL 之外,更新后的表奇怪地插入了新行,而不是被合并。结果如下
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| null|499.97| 2021-06-10| 210611170317|210611170317|
| null|967.93| 2021-06-10| 210611170317|210611170317|
| null| 500.0| 2021-06-10| 210611170317|210611170317|
| null|967.93| 2021-06-10| 210611170317| OFFICIAL|
| null| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| null|499.97| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
DeltaLake 似乎没有正确处理desk_mirror_name,它在源表和目标表中都有空值。
这样的具体情况如何处理?
【问题讨论】:
标签: apache-spark merge apache-spark-sql delta-lake