【问题标题】:Renaming columns recursively in a nested structure in Spark在 Spark 的嵌套结构中递归地重命名列
【发布时间】:2018-12-22 02:50:30
【问题描述】:

我正在尝试替换我的 DataFrame 的所有列中的某些字符,它有很多嵌套的结构类型。

我尝试递归处理架构字段,但由于某种原因,它只是重命名顶层的字段,即使它到达叶节点。

我正在尝试将列名中的 : 字符替换为 _

这是我写的 scala 代码:

class UpdateSchema {
  
  val logger = LoggerFactory.getLogger(classOf[UpdateSchema])
  
  Logger.getLogger("org").setLevel(Level.OFF)
  
  Logger.getLogger("akka").setLevel(Level.OFF)
  
  val sparkSession = SparkLauncher.spark

  import sparkSession.implicits._   

  def updateSchema(filePath:String):Boolean ={
    logger.info(".updateSchema() : filePath ={}",filePath);
    logger.info(".updateSchema() : sparkSession ={}",sparkSession);
    if(sparkSession!=null){
      var xmlDF = sparkSession
                  .read
                  .format("com.databricks.spark.xml")
                  .option("rowTag","ns:fltdMessage")
                  .option("inferschema","true")
                  .option("attributePrefix","attr_")
                  .load(filePath)
                  .toDF()
      
      xmlDF.printSchema()
      val updatedDF = renameDataFrameColumns(xmlDF.toDF()) 
      updatedDF.printSchema()
    }
    else
      logger.info(".updateSchema(): Spark Session is NULL !!!");
    false;
  }


    def replaceSpecialChars(str:String):String ={
          val newColumn:String =  str.replaceAll(":", "_")
          //logger.info(".replaceSpecialChars() : Old Column Name =["+str+"] New Column Name =["+newColumn+"]")
          return newColumn
      }
      
      def renameColumn(df:DataFrame,colName:String,prefix:String):DataFrame ={
        val newColuName:String = replaceSpecialChars(colName)
        logger.info(".renameColumn(): prefix=["+prefix+"] colName=["+colName+"] New Column Name=["+newColuName+"]")
        if(prefix.equals("")){
          if(df.col(colName)!=null){
            return df.withColumnRenamed(colName, replaceSpecialChars(colName))
          }
          else{
            logger.error(".logSchema() : Column ["+prefix+"."+colName+"] Not found in DataFrame !! ")
            logger.info("Prefix ="+prefix+" Existing Columns =["+df.columns.mkString("),(")+"]")
            throw new Exception("Unable to find Column ["+prefix+"."+colName+"]")
          }
        }
        else{
          if(df.col(prefix+"."+colName)!=null){
            return df.withColumnRenamed(prefix+"."+colName, prefix+"."+replaceSpecialChars(colName))
          }
          else{
            logger.error(".logSchema() : Column ["+prefix+"."+colName+"] Not found in DataFrame !! ")
            logger.info("Prefix ="+prefix+" Existing Columns =["+df.columns.mkString("),(")+"]")
            throw new Exception("Unable to find Column ["+prefix+"."+colName+"]")
          }
        }
      }
      
      def getStructType(schema:StructType,fieldName:String):StructType = {
        schema.fields.foreach(field => {
              field.dataType match{
                case st:StructType => {
                  logger.info(".getStructType(): Current Field Name =["+field.name.toString()+"] Checking for =["+fieldName+"]")
                  if(field.name.toString().equals(fieldName)){
                    return field.dataType.asInstanceOf[StructType]
                  }
                  else{
                    getStructType(st,fieldName)
                  }
                }
                case _ =>{
                  logger.info(".getStructType(): Non Struct Type. Ignoring Filed=["+field.name.toString()+"]");
                }
              }
          })
          throw new Exception("Unable to find Struct Type for filed Name["+fieldName+"]")
      }
      
      def processSchema(df:DataFrame,schema:StructType,prefix:String):DataFrame ={
        var updatedDF:DataFrame =df
        schema.fields.foreach(field =>{
          field.dataType match {
            case st:StructType => {
                logger.info(".processSchema() : Struct Type =["+st+"]");
                logger.info(".processSchema() : Field Data Type =["+field.dataType+"]");
                logger.info(".processSchema() : Renaming the Struct Field =["+field.name.toString()+"] st=["+st.fieldNames.mkString(",")+"]") 
                updatedDF = renameColumn(updatedDF,field.name.toString(),prefix)
                logger.info(".processSchema() : Column List after Rename =["+updatedDF.columns.mkString(",")+"]")
               // updatedDF.schema.fields.foldLeft(z)(op)
                val renamedCol:String = replaceSpecialChars(field.name.toString())
                var fieldType:DataType = null;
                //if(prefix.equals(""))
                fieldType = schema.fields.find(f =>{ (f.name.toString().equals(field.name.toString()))}).get.dataType
                
                if(prefix.trim().equals("") 
                    //&& fieldType.isInstanceOf[StructType]
                    ){
                  updatedDF = processSchema(updatedDF,
                      getStructType(updatedDF.schema,renamedCol),
                      replaceSpecialChars(field.name.toString()))
                }
                else{
                  updatedDF = processSchema(updatedDF,
                      getStructType(updatedDF.schema,renamedCol),
                      prefix+"."+replaceSpecialChars(field.name.toString()))
                }
              }
            case _ => {
              updatedDF = renameColumn(updatedDF,field.name.toString(),prefix)
            }
          }
        })
        //updatedDF.printSchema()
        
        
        return updatedDF
      }
      
      def renameDataFrameColumns(df:DataFrame):DataFrame ={
        val schema = df.schema;
        return processSchema(df,schema,"")
      }
}

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    这是一个递归方法,它通过replaceAll 重命名任何名称包含要替换的子字符串的列来修改 DataFrame 架构:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    
    def renameAllColumns(schema: StructType, from: String, to: String): StructType = {
      def recurRename(schema: StructType, from: String, to:String): Seq[StructField] =
        schema.fields.map{
          case StructField(name, dtype: StructType, nullable, meta) =>
            StructField(name.replaceAll(from, to), StructType(recurRename(dtype, from, to)), nullable, meta)
          case StructField(name, dtype: ArrayType, nullable, meta) => dtype.elementType match {
              case struct: StructType => StructField(name.replaceAll(from, to), ArrayType(StructType(recurRename(struct, from, to)), true), nullable, meta)
              case other => StructField(name.replaceAll(from, to), other, nullable, meta)
            }
          case StructField(name, dtype, nullable, meta) =>
            StructField(name.replaceAll(from, to), dtype, nullable, meta)
        }
    
      StructType(recurRename(schema, from, to))
    }
    

    在具有嵌套结构的示例 DataFrame 上测试该方法:

    case class M(i: Int, `p:q`: String)
    case class N(j: Int, m: M)
    
    val df = Seq(
      (1, "a", Array(N(7, M(11, "x")), N(72, M(112, "x2")))),
      (2, "b", Array(N(8, M(21, "y")))),
      (3, "c", Array(N(9, M(31, "z"))))
    ).toDF("c1", "c2:0", "c3")
    
    df.printSchema
    // root
    //  |-- c1: integer (nullable = false)
    //  |-- c2:0: string (nullable = true)
    //  |-- c3: array (nullable = true)
    //  |    |-- element: struct (containsNull = true)
    //  |    |    |-- j: integer (nullable = false)
    //  |    |    |-- m: struct (nullable = true)
    //  |    |    |    |-- i: integer (nullable = false)
    //  |    |    |    |-- p:q: string (nullable = true)
    
    val newSchema = renameAllColumns(df.schema, ":", "_")
    
    spark.createDataFrame(df.rdd, newSchema).printSchema
    // root
    //  |-- c1: integer (nullable = false)
    //  |-- c2_0: string (nullable = true)
    //  |-- c3: array (nullable = true)
    //  |    |-- element: struct (containsNull = true)
    //  |    |    |-- j: integer (nullable = false)
    //  |    |    |-- m: struct (nullable = true)
    //  |    |    |    |-- i: integer (nullable = false)
    //  |    |    |    |-- p_q: string (nullable = true)
    

    请注意,由于replaceAll 方法支持Regex 模式,因此可以应用具有更通用替换条件的方法。例如,以下是如何修剪从“:”字符开始的列名:

    val newSchema = renameAllColumns(df.schema, """:.*""", "")
    
    spark.createDataFrame(df.rdd, newSchema).printSchema
    // root
    //  |-- c1: integer (nullable = false)
    //  |-- c2: string (nullable = true)
    //  |-- c3: array (nullable = true)
    //  |    |-- element: struct (containsNull = true)
    //  |    |    |-- j: integer (nullable = false)
    //  |    |    |-- m: struct (nullable = true)
    //  |    |    |    |-- i: integer (nullable = false)
    //  |    |    |    |-- p: string (nullable = true)
    

    【讨论】:

      【解决方案2】:

      不幸的是,您无法使用 withFieldRenamed 轻松重命名单个嵌套字段,就像您尝试做的那样。我知道重命名嵌套字段的唯一方法是对提供了具有相同结构和数据类型但新字段名称的类型的字段进行强制转换。这必须在顶级字段上完成,因此您需要一次性完成所有字段。这是一个例子:

      创建一些输入数据

      case class InnerRecord(column1: String, column2: Int)
      case class Record(field: InnerRecord)
      
      val df = Seq(
          Record(InnerRecord("a", 1)),
          Record(InnerRecord("b", 2))
      ).toDF
      
      df.printSchema
      

      输入数据如下所示:

      root
       |-- field: struct (nullable = true)
       |    |-- column1: string (nullable = true)
       |    |-- column2: integer (nullable = false)
      

      这是一个使用 withColumnRenamed 的示例。您会在输出中注意到它实际上并没有做任何事情!

      val updated = df.withColumnRenamed("field.column1", "field.newname")
      updated.printSchema
      
      root
       |-- field: struct (nullable = true)
       |    |-- column1: string (nullable = true)
       |    |-- column2: integer (nullable = false)
      

      您可以通过以下方式来代替投射。该函数将在更新名称时递归地重新创建嵌套字段类型。就我而言,我只是用“col_”替换了“column”。我也只在一个字段上运行它,但您可以轻松地循环遍历架构中的所有字段。

      import org.apache.spark.sql.types._
      
      def rename(dataType: DataType): DataType = dataType match {
          case StructType(fields) => 
              StructType(fields.map { 
                  case StructField(name, dtype, nullable, meta) => 
                      val newName = name.replace("column", "col_")
                      StructField(newName, rename(dtype), nullable, meta)
              })
      
          case _ => dataType
      }
      
      
      val fDataType = df.schema.filter(_.name == "field").head.dataType
      val updated = df.withColumn("field", $"field".cast(rename(fDataType)))
      updated.printSchema
      

      哪些打印:

      root
       |-- field: struct (nullable = true)
       |    |-- col_1: string (nullable = true)
       |    |-- col_2: integer (nullable = false)
      

      【讨论】:

      • 感谢 Ryan 的投入。非常感谢您的帮助
      【解决方案3】:

      我对@Leo C 的回答有一些疑问,所以我改用了一些细微的变化。它还接受任何映射函数f 进行重命名。

      def renameAllColumns(schema: StructType, f: String => String): StructType = {
        def recurRename(schema: StructType, f: String => String): Seq[StructField] =
          schema.fields.map{
            case StructField(name, dtype: StructType, nullable, meta) =>
              StructField(f(name), StructType(recurRename(dtype, f)), nullable, meta)
            case StructField(name, dtype: ArrayType, nullable, meta) => dtype.elementType match {
                case struct: StructType => StructField(f(name), ArrayType(StructType(recurRename(struct, f)), true), nullable, meta)
                case other => StructField(f(name), ArrayType(other), nullable, meta)
              }
            case StructField(name, dtype, nullable, meta) =>
              StructField(f(name), dtype, nullable, meta)
          }
      
        StructType(recurRename(schema, f))
      }
      

      【讨论】:

        猜你喜欢
        • 2019-08-17
        • 2013-11-17
        • 2020-10-06
        • 1970-01-01
        • 2020-09-27
        • 2017-08-25
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多