【发布时间】:2017-12-18 19:20:46
【问题描述】:
我正在使用 Scala 和 Spark 2.1.1。这是我的 DataFrame 架构的一部分:
root
|-- id: string (nullable = true)
|-- op: string (nullable = true)
|-- before: struct (containsNull = true)
| |-- a: string (nullable = true)
| |-- b: string (nullable = true)
| |-- c: string (nullable = true)
| |-- d: string (nullable = true)
|-- after: struct (containsNull = true)
| |-- a: string (nullable = true)
| |-- c: string (nullable = true)
它描述了对外部数据库的操作。在更新操作的情况下,数据框包含表示更新操作之前和之后的行状态的“之前”和“之后”列。 “之前”始终包含整行,而“之后”仅包含更新的字段。我需要将它们合并到包含更新后整行的列(最终是 DataFrame)中(只需采用“之前”并将其字段的值更改为从“之后”获取的值,如果存在)。我尝试了不同的方法来实现这一点(主要是通过在“之前”和“之后”上执行 UDF 创建一个新列),但我无法完成。
一个有 3 行的示例(为了方便,我将使用 JSON 表示法):
{... "before": {"a": "1", "b": "2", "c": "test", "d": true}, "after": {"b": "3"} ...}
{... "before": {"a": "2", "b": null, "c": "test2", "d": false}, "after": {"c": "test4", "d": true} ...}
{... "before": {"other": "4", "other2": "5"}, "after": {"other": "5"} ...}
我需要什么:
{... "fullAfter": {"a": "1", "b": "3", "c": "test", "d": true} ...}
{... "fullAfter": {"a": "2", "b": null, "c": "test4", "d": true} ...}
{... "fullAfter": {"other": "5", "other2": "5"} ...}
问题是 DataFrame 包含来自不同表的操作,因此 'before' 和 'after' 在每一行中可能有不同的架构。
我尝试通过将“之前”和“之后”转换为 JSON (to_json) 并基于它们创建新的 JSON 在 UDF 中进行一些操作。不幸的是 to_json 方法会导致具有空值的字段消失,因此我无法在没有完整的原始架构的情况下创建完整的行:
{... "before": {"a": "2", "b": null, "c": "test2", "d": false}, "after": {"c": "test4", "d": true} ...}
{... "fullAfter": {"a": "2", "c": "test4", "d": true} ...} - "b" is missing
有什么可行的、简单/有效的方法吗?
【问题讨论】:
-
你能贴出你目前尝试过的代码吗?
-
一个数据框应该有一个固定的模式。每行的可变架构不适用于整个数据框。
标签: scala apache-spark dataframe nested