【问题标题】:Adding multiple columns to Spark Dataset while iterating its records在迭代其记录时向 Spark 数据集添加多列
【发布时间】:2017-07-27 22:48:45
【问题描述】:

Spark 2.1.x 在这里。我有一堆 JSON 文件(具有相同的架构),我正在将它们读入单个 Spark Dataset,如下所示:

val ds = spark.read.json("some/path/to/lots/of/json/*.json")

然后我可以打印ds 架构并查看所有内容均已正确读取:

ds.printSchema()

// Outputs:
root
 |-- fizz: boolean (nullable = true)
 |-- moniker: string (nullable = true)
 |-- buzz: string (nullable = true)
 |-- foo: string (nullable = true)
 |-- bar: string (nullable = true)

请注意moniker 字符串列。我现在想:

  1. 向该数据集和/或其架构添加三个新列; (a) 名为 special_date 的日期/时间列,(b) 名为 special_uuid 的 UUID 列和 (c) 名为 special_phrase 的字符串列;那么
  2. 我需要遍历ds 中的所有记录,并且对于每条记录,将其moniker 值传递给三个后续函数:(a) deriveSpecialDate(val moniker : String) : Date、(b) deriveSpecialUuid(val moniker : String) : UUID 和(c) @ 987654333@。然后,每个函数的输出都需要成为相应列的记录值。

我的最佳尝试:

val ds = spark.read.json("some/path/to/lots/of/json/*.json")

ds.foreach(record => {
  val moniker : String = record.select("moniker")
  val specialDate : Date = deriveSpecialDate(moniker)
  val specialUuid : UUID = deriveSpecialUuid(moniker)
  val specialPhrase : String = deriveSpecialPhrase(moniker)

  // This doesn't work because special_* fields don't exist in the original
  // schema dervied from the JSON files. We're ADDING these columns after the
  // JSON read and then populating their values dynamically.
  record.special_date = specialDate
  record.special_uuid = specialUuid
  record.special_phrase = specialPhrase
})

知道如何实现吗?

【问题讨论】:

  • 所以基本上你想通过调用每个函数来添加三列?\
  • 嗨@ShankarKoirala (+1) - 没错!

标签: apache-spark


【解决方案1】:

我会使用 spark 的 udf(用户定义函数)将原始数据集增强为 3 列

val deriveSpecialDate = udf((moniker: String) => // implement here)
val deriveSpecialUuid= udf((moniker: String) => // implement here)
val deriveSpecialPhrase = udf((moniker: String) => // implement here)

之后你可以这样做:

ds.withColumn("special_date", deriveSpecialDate(col("moniker)))
.withColumn("special_uuid", deriveSpecialUuid(col("moniker)))
.withColumn("special_phrase", deriveSpecialPhrase (col("moniker)))

它将为您带来一个包含三列的新数据框。如果你愿意,你也可以使用 map 函数转换成数据集

【讨论】:

  • 谢谢@dumitru (+1) - 这个解决方案 (ds.withColumn...) 是否也会保留数据集的原始列?原始数据集中有 5 个 cols,我想再添加 3 个(总共 8 个)。或者您是否将架构 更改 为其中只有 3 列(删除原来的 5 列)?
  • 它也会保留列,你必须显式调用 drop 来删除特定的列
【解决方案2】:

要创建一个新列,您可以使用 withColumn。如果你已经有一个函数,你需要将该函数注册为一个UDF(用户定义函数)

val sd = sqlContext.udf.register("deriveSpecialDate",deriveSpecialDate _ )
val su = sqlContext.udf.register("deriveSpecialUuid",deriveSpecialUuid _ )
val sp = sqlContext.udf.register("deriveSpecialPhrase", deriveSpecialPhrase _)

要使用这个 udf,你需要 withcolumn 来创建一个新列

ds.withColumn("special_date", sd($"moniker))
 .withColumn("special_uuid", su($"moniker))
 .withColumn("special_phrase", sp($"moniker))

这样,您将获得包含三个新添加列的原始数据集。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-09-05
    • 1970-01-01
    • 1970-01-01
    • 2020-04-10
    • 2019-01-28
    • 2018-05-08
    • 1970-01-01
    相关资源
    最近更新 更多