【问题标题】:Spark - nested columns mergeSpark - 嵌套列合并
【发布时间】:2017-12-18 19:20:46
【问题描述】:

我正在使用 Scala 和 Spark 2.1.1。这是我的 DataFrame 架构的一部分:

root
|-- id: string (nullable = true)
|-- op: string (nullable = true)
|-- before: struct (containsNull = true)
|    |-- a: string (nullable = true)
|    |-- b: string (nullable = true)
|    |-- c: string (nullable = true)
|    |-- d: string (nullable = true)
|-- after: struct (containsNull = true)
|    |-- a: string (nullable = true)
|    |-- c: string (nullable = true)

它描述了对外部数据库的操作。在更新操作的情况下,数据框包含表示更新操作之前和之后的行状态的“之前”和“之后”列。 “之前”始终包含整行,而“之后”仅包含更新的字段。我需要将它们合并到包含更新后整行的列(最终是 DataFrame)中(只需采用“之前”并将其字段的值更改为从“之后”获取的值,如果存在)。我尝试了不同的方法来实现这一点(主要是通过在“之前”和“之后”上执行 UDF 创建一个新列),但我无法完成。

一个有 3 行的示例(为了方便,我将使用 JSON 表示法):

{... "before": {"a": "1", "b": "2", "c": "test", "d": true}, "after": {"b": "3"} ...}
{... "before": {"a": "2", "b": null, "c": "test2", "d": false}, "after": {"c": "test4", "d": true} ...}
{... "before": {"other": "4", "other2": "5"}, "after": {"other": "5"} ...}

我需要什么:

{... "fullAfter": {"a": "1", "b": "3", "c": "test", "d": true} ...}
{... "fullAfter": {"a": "2", "b": null, "c": "test4", "d": true} ...}
{... "fullAfter": {"other": "5", "other2": "5"} ...}

问题是 DataFrame 包含来自不同表的操作,因此 'before' 和 'after' 在每一行中可能有不同的架构。

我尝试通过将“之前”和“之后”转换为 JSON (to_json) 并基于它们创建新的 JSON 在 UDF 中进行一些操作。不幸的是 to_json 方法会导致具有空值的字段消失,因此我无法在没有完整的原始架构的情况下创建完整的行:

{... "before": {"a": "2", "b": null, "c": "test2", "d": false}, "after": {"c": "test4", "d": true} ...}
{... "fullAfter": {"a": "2", "c": "test4", "d": true} ...} - "b" is missing

有什么可行的、简单/有效的方法吗?

【问题讨论】:

  • 你能贴出你目前尝试过的代码吗?
  • 一个数据框应该有一个固定的模式。每行的可变架构不适用于整个数据框。

标签: scala apache-spark dataframe nested


【解决方案1】:

我建议使用 spark sql 来解决问题。 让我们调用你的表 table_with_updates(id,op,before[a,b,c,d],after[a,c])

<!-- language: sql -->
select id, op, struct(
     coalesce(after.a,before.a) as a,
     after.b as b,
     coalesce(after.c,before.c) as c,
     before.d as d) as fullAfter
from table_with_updates

显然,您也可以将 coalesce 用于 b 和 d,但我认为您的 after 架构中缺少它。合并只是取第一个非空值。

使用 UDF,您会遇到必须输入的问题。您将需要 case class Before(a:String,b:String,c:String,d:String) 和 After。这不适用于空值,并且需要大量逻辑和编码。甚至 scala udfs 也比 spark sql 函数慢得多。

如果你需要它更动态,我通常只是写一些代码来从列名生成 sql。 Scala 在这方面做得很好。

 val colsAfter = spark.table("table_with_updates").select($"after").columns()
 val colsBefore = spark.table("table_with_updates").select($"before").columns()
    .map(c => if colsAfter.contains(c) s"coalesce(after.$c,before.$c" else s"before.$c as $c")
 val structFields = colsBefore.mkString(",")

 sql(" select id, op, struct($structFields) as fullAfter from table_with_updates")

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-05-19
    • 1970-01-01
    • 1970-01-01
    • 2018-03-25
    • 1970-01-01
    • 2017-08-13
    • 1970-01-01
    相关资源
    最近更新 更多