【问题标题】:Rename nested struct columns in a Spark DataFrame [duplicate]重命名 Spark DataFrame 中的嵌套结构列 [重复]
【发布时间】:2019-08-17 03:33:16
【问题描述】:

我正在尝试更改 scala 中 DataFrame 列的名称。我可以轻松更改直接字段的列名,但在转换数组结构列时遇到了困难。

下面是我的 DataFrame 架构。

|-- _VkjLmnVop: string (nullable = true)
|-- _KaTasLop: string (nullable = true)
|-- AbcDef: struct (nullable = true)
 |    |-- UvwXyz: struct (nullable = true)
 |    |    |-- _MnoPqrstUv: string (nullable = true)
 |    |    |-- _ManDevyIxyz: string (nullable = true)

但我需要像下面这样的架构

|-- vkj_lmn_vop: string (nullable = true)
|-- ka_tas_lop: string (nullable = true)
|-- abc_def: struct (nullable = true)
 |    |-- uvw_xyz: struct (nullable = true)
 |    |    |-- mno_pqrst_uv: string (nullable = true)
 |    |    |-- man_devy_ixyz: string (nullable = true)

对于非结构列,我在下面更改列名

def aliasAllColumns(df: DataFrame): DataFrame = {
  df.select(df.columns.map { c =>
    df.col(c)
      .as(
        c.replaceAll("_", "")
          .replaceAll("([A-Z])", "_$1")
          .toLowerCase
          .replaceFirst("_", ""))
  }: _*)
}
aliasAllColumns(file_data_df).show(1)

如何动态更改 Struct 列名?

【问题讨论】:

  • 你有像 Maps(_VkjLmnVop => vkj_lmn_vop, _KaTasLop => ka_tas_lop ) 这样的重命名列吗?。
  • @stack0114106,我有很多专栏。所以我正在考虑动态更改列名。

标签: scala apache-spark dataframe column-alias


【解决方案1】:

您可以创建一个递归方法来遍历 DataFrame 模式以重命名列:

import org.apache.spark.sql.types._

def renameAllCols(schema: StructType, rename: String => String): StructType = {
  def recurRename(schema: StructType): Seq[StructField] = schema.fields.map{
      case StructField(name, dtype: StructType, nullable, meta) =>
        StructField(rename(name), StructType(recurRename(dtype)), nullable, meta)
      case StructField(name, dtype: ArrayType, nullable, meta) if dtype.elementType.isInstanceOf[StructType] =>
        StructField(rename(name), ArrayType(StructType(recurRename(dtype.elementType.asInstanceOf[StructType])), true), nullable, meta)
      case StructField(name, dtype, nullable, meta) =>
        StructField(rename(name), dtype, nullable, meta)
    }
  StructType(recurRename(schema))
}

使用以下示例对其进行测试:

import org.apache.spark.sql.functions._
import spark.implicits._

val renameFcn = (s: String) =>
  s.replace("_", "").replaceAll("([A-Z])", "_$1").toLowerCase.dropWhile(_ == '_')

case class C(A_Bc: Int, D_Ef: Int)

val df = Seq(
  (10, "a", C(1, 2), Seq(C(11, 12), C(13, 14)), Seq(101, 102)),
  (20, "b", C(3, 4), Seq(C(15, 16)), Seq(103))
).toDF("_VkjLmnVop", "_KaTasLop", "AbcDef", "ArrStruct", "ArrInt")

val newDF = spark.createDataFrame(df.rdd, renameAllCols(df.schema, renameFcn))

newDF.printSchema
// root
//  |-- vkj_lmn_vop: integer (nullable = false)
//  |-- ka_tas_lop: string (nullable = true)
//  |-- abc_def: struct (nullable = true)
//  |    |-- a_bc: integer (nullable = false)
//  |    |-- d_ef: integer (nullable = false)
//  |-- arr_struct: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- a_bc: integer (nullable = false)
//  |    |    |-- d_ef: integer (nullable = false)
//  |-- arr_int: array (nullable = true)
//  |    |-- element: integer (containsNull = false)

【讨论】:

  • 嗨,Leo,这就是我要找的。这真是很棒的东西。非常感谢。我接受这个答案。
  • 我不知道它是如何工作的。我试过了,但它并没有只改变第一级架构的嵌套列。
  • 哦,如果列是结构数组,我想它不起作用
  • @Dave,感谢您的反馈。没错,它没有涵盖 StructType 的嵌套 Array 元素的情况。我已经修改了解决此类情况的解决方案。
  • 重命名架构的好方法。
【解决方案2】:

据我所知,无法直接重命名嵌套字段。

从一侧,您可以尝试移动到平坦的物体。

不过,如果你需要保持结构,你可以玩spark.sql.functions.struct(*cols)

Creates a new struct column.
Parameters: cols – list of column names (string) or list of Column expressions

您需要分解所有架构,生成所需的别名,然后使用struct 函数再次组合它。

这不是最好的解决方案。但它的东西:)

Pd:我附上了 PySpark 文档,因为它包含比 Scala 更好的解释。

【讨论】:

  • 但是我有 10 个结构列,每个结构中有 18 个属性?还有其他更好的方法吗?
  • 我建议你编写一个尾递归函数,给定一个模式,它会生成所有替换/结构方法。
猜你喜欢
  • 2021-12-18
  • 2018-12-22
  • 2019-06-13
  • 2020-09-27
  • 1970-01-01
  • 1970-01-01
  • 2019-08-16
  • 2016-06-06
  • 1970-01-01
相关资源
最近更新 更多