【发布时间】:2017-07-27 22:48:45
【问题描述】:
Spark 2.1.x 在这里。我有一堆 JSON 文件(具有相同的架构),我正在将它们读入单个 Spark Dataset,如下所示:
val ds = spark.read.json("some/path/to/lots/of/json/*.json")
然后我可以打印ds 架构并查看所有内容均已正确读取:
ds.printSchema()
// Outputs:
root
|-- fizz: boolean (nullable = true)
|-- moniker: string (nullable = true)
|-- buzz: string (nullable = true)
|-- foo: string (nullable = true)
|-- bar: string (nullable = true)
请注意moniker 字符串列。我现在想:
- 向该数据集和/或其架构添加三个新列; (a) 名为
special_date的日期/时间列,(b) 名为special_uuid的 UUID 列和 (c) 名为special_phrase的字符串列;那么 - 我需要遍历
ds中的所有记录,并且对于每条记录,将其moniker值传递给三个后续函数:(a)deriveSpecialDate(val moniker : String) : Date、(b)deriveSpecialUuid(val moniker : String) : UUID和(c) @ 987654333@。然后,每个函数的输出都需要成为相应列的记录值。
我的最佳尝试:
val ds = spark.read.json("some/path/to/lots/of/json/*.json")
ds.foreach(record => {
val moniker : String = record.select("moniker")
val specialDate : Date = deriveSpecialDate(moniker)
val specialUuid : UUID = deriveSpecialUuid(moniker)
val specialPhrase : String = deriveSpecialPhrase(moniker)
// This doesn't work because special_* fields don't exist in the original
// schema dervied from the JSON files. We're ADDING these columns after the
// JSON read and then populating their values dynamically.
record.special_date = specialDate
record.special_uuid = specialUuid
record.special_phrase = specialPhrase
})
知道如何实现吗?
【问题讨论】:
-
所以基本上你想通过调用每个函数来添加三列?\
-
嗨@ShankarKoirala (+1) - 没错!
标签: apache-spark