【问题标题】:Dropping a nested column from Spark DataFrame从 Spark DataFrame 中删除嵌套列
【发布时间】:2015-12-20 01:05:31
【问题描述】:

我有一个带有架构的DataFrame

root
 |-- label: string (nullable = true)
 |-- features: struct (nullable = true)
 |    |-- feat1: string (nullable = true)
 |    |-- feat2: string (nullable = true)
 |    |-- feat3: string (nullable = true)

虽然,我可以使用

过滤数据框
  val data = rawData
     .filter( !(rawData("features.feat1") <=> "100") )

我无法使用

删除列
  val data = rawData
       .drop("features.feat1")

我在这里做错了吗?我也尝试(不成功)做drop(rawData("features.feat1")),尽管这样做没有多大意义。

提前致谢,

尼基尔

【问题讨论】:

  • 如果你把它映射到一个新的数据框呢?我不认为 DataFrame API 允许您在结构列类型中删除结构字段。
  • 哦。我会尝试,但如果我必须映射只是为了以这种方式解析嵌套列名,这似乎很不方便:(。
  • 您始终可以使用 DataFrame 的 .columns() 方法获取所有列,从序列中删除不需要的列并执行 select(myColumns:_*)。应该短一点。

标签: scala apache-spark dataframe apache-spark-sql apache-spark-ml


【解决方案1】:

这只是一个编程练习,但你可以尝试这样的事情:

import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.{functions => f}
import scala.util.Try

case class DFWithDropFrom(df: DataFrame) {
  def getSourceField(source: String): Try[StructField] = {
    Try(df.schema.fields.filter(_.name == source).head)
  }

  def getType(sourceField: StructField): Try[StructType] = {
    Try(sourceField.dataType.asInstanceOf[StructType])
  }

  def genOutputCol(names: Array[String], source: String): Column = {
    f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)
  }

  def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
    getSourceField(source)
      .flatMap(getType)
      .map(_.fieldNames.diff(toDrop))
      .map(genOutputCol(_, source))
      .map(df.withColumn(source, _))
      .getOrElse(df)
  }
}

示例用法:

scala> case class features(feat1: String, feat2: String, feat3: String)
defined class features

scala> case class record(label: String, features: features)
defined class record

scala> val df = sc.parallelize(Seq(record("a_label",  features("f1", "f2", "f3")))).toDF
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]

scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show
+-------+--------+
|  label|features|
+-------+--------+
|a_label| [f2,f3]|
+-------+--------+


scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show
+-------+----------+
|  label|  features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+


scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show
+-------+----------+
|  label|  features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+

添加 implicit conversion 即可。

【讨论】:

  • 虽然我赞成这个答案,因为它在所有嵌套行都具有统一模式时工作 - 否则它不起作用 - 它只返回原始 DataFrame。
  • 看来问题出在语句的 getOrElse 部分,如果抛出任何异常,它不会被打印并且“Else”部分接管并返回原始 DataFrame。例如,在我的例子中,不区分大小写是问题所在 - 有两列名称相同但大小写不同。
【解决方案2】:

此版本允许您删除任何级别的嵌套列:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, DataType}

/**
  * Various Spark utilities and extensions of DataFrame
  */
object DataFrameUtils {

  private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
      None
    } else {
      colType match {
        case colType: StructType =>
          if (dropColName.startsWith(s"${fullColName}.")) {
            Some(struct(
              colType.fields
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                  })
                : _*))
          } else {
            Some(col)
          }
        case other => Some(col)
      }
    }
  }

  protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
    df.schema.fields
      .flatMap(f => {
        if (colName.startsWith(s"${f.name}.")) {
          dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
            case Some(x) => Some((f.name, x))
            case None => None
          }
        } else {
          None
        }
      })
      .foldLeft(df.drop(colName)) {
        case (df, (colName, column)) => df.withColumn(colName, column)
      }
  }

  /**
    * Extended version of DataFrame that allows to operate on nested fields
    */
  implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
    /**
      * Drops nested field from DataFrame
      *
      * @param colName Dot-separated nested field name
      */
    def dropNestedColumn(colName: String): DataFrame = {
      DataFrameUtils.dropColumn(df, colName)
    }
  }
}

用法:

import DataFrameUtils._
df.dropNestedColumn("a.b.c.d")

【讨论】:

  • 非常感谢!您是否有机会更新它以从数组下的数组中的结构中删除字段?已经破解了一天多,关闭但无法获取。即 parent:array>>>
  • @alexP_Keaton 嘿,您是否找到了在数组中删除列的解决方案?
  • 我想补充一点,此方法不保留修改后的父结构的“可为空”属性。在此示例中,features 将变为 struct (nullable = false)
  • 我发现解决此问题的一种方法是将 f.nullable 传递给 dropSubColumn 并在 struct(... :_*) 的结果上使用此构造 when(col.isNotNull, newCol)
  • 这里实际上不需要df.drop(colName) 位。无论如何它都不起作用(或者我们可以直接使用 API 删除嵌套列)。此外,withColumn 将用新定义替换现有列名(如果给定)。
【解决方案3】:

扩展 spektom 答案。支持数组类型:

object DataFrameUtils {

  private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
      None
    } else if (dropColName.startsWith(s"$fullColName.")) {
      colType match {
        case colType: StructType =>
          Some(struct(
            colType.fields
              .flatMap(f =>
                dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                  case Some(x) => Some(x.alias(f.name))
                  case None => None
                })
              : _*))
        case colType: ArrayType =>
          colType.elementType match {
            case innerType: StructType =>
              Some(struct(innerType.fields
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                  })
                : _*))
          }

        case other => Some(col)
      }
    } else {
      Some(col)
    }
  }

  protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
    df.schema.fields
      .flatMap(f => {
        if (colName.startsWith(s"${f.name}.")) {
          dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
            case Some(x) => Some((f.name, x))
            case None => None
          }
        } else {
          None
        }
      })
      .foldLeft(df.drop(colName)) {
        case (df, (colName, column)) => df.withColumn(colName, column)
      }
  }

  /**
    * Extended version of DataFrame that allows to operate on nested fields
    */
  implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
    /**
      * Drops nested field from DataFrame
      *
      * @param colName Dot-separated nested field name
      */
    def dropNestedColumn(colName: String): DataFrame = {
      DataFrameUtils.dropColumn(df, colName)
    }
  }

}

【讨论】:

  • case colType: ArrayType 中对struct 的调用需要包含在array 中,否则您会“丢失”包含已删除列的父级的数组。此外,由于某种原因,最低结构中的其余项目(即具有被删除成员的项目)被转换为array,我仍在调试。
  • @JeffEvans 您找到发生这种情况的原因了吗(字段转换为数组)?我调试了一整天,我无法理解 Spark 在那里做什么。可能是一个错误?
  • 不确定。我们正在处理的代码可能不正确,但我不能很好地理解 Spark 库代码的内部结构,还不能肯定地说。
  • 正如@JeffEvans 提到的,这里有一个错误,它将剩余的项目转换为数组。为避免这种情况,您应该这样做:case Some(x) =&gt; Some(x.getItem(0).alias(f.name)) 而不是 ArrayType 情况下的 case Some(x) =&gt; Some(x.alias(f.name))
  • alias 之前执行.getItem(0)(在ArrayType -> StructType 案例中)确实“有效”。但是,这确实不需要。我相信 Spark 本身存在一个错误,所以我为此打开了一个 Jira:issues.apache.org/jira/browse/SPARK-31779
【解决方案4】:

我将扩展 mmendez.semantic 的回答 here,并解释子线程中描述的问题。

  def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
    if (fullColName.equals(dropColName)) {
      None
    } else if (dropColName.startsWith(s"$fullColName.")) {
      colType match {
        case colType: StructType =>
          Some(struct(
            colType.fields
                .flatMap(f =>
                  dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                  })
                : _*))
        case colType: ArrayType =>
          colType.elementType match {
            case innerType: StructType =>
              // we are potentially dropping a column from within a struct, that is itself inside an array
              // Spark has some very strange behavior in this case, which they insist is not a bug
              // see https://issues.apache.org/jira/browse/SPARK-31779 and associated comments
              // and also the thread here: https://stackoverflow.com/a/39943812/375670
              // this is a workaround for that behavior

              // first, get all struct fields
              val innerFields = innerType.fields
              // next, create a new type for all the struct fields EXCEPT the column that is to be dropped
              // we will need this later
              val preserveNamesStruct = ArrayType(StructType(
                innerFields.filterNot(f => s"$fullColName.${f.name}".equals(dropColName))
              ))
              // next, apply dropSubColumn recursively to build up the new values after dropping the column
              val filteredInnerFields = innerFields.flatMap(f =>
                dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                    case Some(x) => Some(x.alias(f.name))
                    case None => None
                }
              )
              // finally, use arrays_zip to unwrap the arrays that were introduced by building up the new. filtered
              // struct in this way (see comments in SPARK-31779), and then cast to the StructType we created earlier
              // to get the original names back
              Some(arrays_zip(filteredInnerFields:_*).cast(preserveNamesStruct))
          }

        case _ => Some(col)
      }
    } else {
      Some(col)
    }
  }

  def dropColumn(df: DataFrame, colName: String): DataFrame = {
    df.schema.fields.flatMap(f => {
      if (colName.startsWith(s"${f.name}.")) {
        dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
          case Some(x) => Some((f.name, x))
          case None => None
        }
      } else {
        None
      }
    }).foldLeft(df.drop(colName)) {
      case (df, (colName, column)) => df.withColumn(colName, column)
    }
  }

spark-shell中的用法:

// if defining the functions above in your spark-shell session, you first need imports
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// now you can paste the function definitions

// create a deeply nested and complex JSON structure    
val jsonData = """{
      "foo": "bar",
      "top": {
        "child1": 5,
        "child2": [
          {
            "child2First": "one",
            "child2Second": 2,
            "child2Third": -19.51
          }
        ],
        "child3": ["foo", "bar", "baz"],
        "child4": [
          {
            "child2First": "two",
            "child2Second": 3,
            "child2Third": 16.78
          }
        ]
      }
    }"""

// read it into a DataFrame
val df = spark.read.option("multiline", "true").json(Seq(jsonData).toDS())

// remove a sub-column
val modifiedDf = dropColumn(df, "top.child2.child2First")

modifiedDf.printSchema
root
 |-- foo: string (nullable = true)
 |-- top: struct (nullable = false)
 |    |-- child1: long (nullable = true)
 |    |-- child2: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- child2Second: long (nullable = true)
 |    |    |    |-- child2Third: double (nullable = true)
 |    |-- child3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- child4: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- child2First: string (nullable = true)
 |    |    |    |-- child2Second: long (nullable = true)
 |    |    |    |-- child2Third: double (nullable = true)


modifiedDf.show(truncate=false)
+---+------------------------------------------------------+
|foo|top                                                   |
+---+------------------------------------------------------+
|bar|[5, [[2, -19.51]], [foo, bar, baz], [[two, 3, 16.78]]]|
+---+------------------------------------------------------+

【讨论】:

  • 我看到我们在 def 调用“dropSubColumn”中使用了“arrays_zip”,但从 spark 2.3 开始,这将不起作用,是否有替代方法。我尝试如下创建一个 UDF,但使用它没有运气。 val zipped = udf((s: Seq[String],t: Seq[String]) => s zip t)
  • 我记得,这是在 2.4 中工作的。但我不再有权访问底层项目代码,所以我不确定。应该很容易使用 spark-shell 进行测试。
【解决方案5】:

按照 spektom 的 scala 代码 sn-p,我在 Java 中创建了一个类似的代码。 由于 java 8 没有 foldLeft,所以我使用了 forEachOrdered。此代码适用于 spark 2.x(我使用的是 2.1) 我还注意到,删除一列并使用具有相同名称的 withColumn 添加它是行不通的,所以我只是替换列,它似乎有效。

代码未经过全面测试,希望它能正常工作:-)

public class DataFrameUtils {

public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) {
    final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame);
    Arrays.stream(dataFrame.schema().fields())
        .flatMap( f -> {
           if (columnName.startsWith(f.name() + ".")) {
               final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName);
               if (column.isPresent()) {
                   return Stream.of(new Tuple2<>(f.name(), column));
               } else {
                   return Stream.empty();
               }
           } else {
               return Stream.empty();
           }
        }).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple));

    return dataFrameFolder.getDF();
}

private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
    Optional<Column> column = Optional.empty();
    if (!fullColumnName.equals(dropColumnName)) {
        if (colType instanceof StructType) {
            if (dropColumnName.startsWith(fullColumnName + ".")) {
                column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName)));
            }
        } else {
            column = Optional.of(col);
        }
    }

    return column;
}

private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
    return Arrays.stream(colType.fields())
        .flatMap(f -> {
                    final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
                            fullColumnName + "." + f.name(), dropColumnName);
                    if (column.isPresent()) {
                        return Stream.of(column.get().alias(f.name()));
                    } else {
                        return Stream.empty();
                    }
                }
        ).toArray(Column[]::new);

}

private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> {
    private Dataset<Row> df;

    public DataFrameFolder(Dataset<Row> df) {
        this.df = df;
    }

    public Dataset<Row> getDF() {
        return df;
    }

    @Override
    public void accept(Tuple2<String, Optional<Column>> colTuple) {
        if (!colTuple._2().isPresent()) {
            df = df.drop(colTuple._1());
        } else {
            df = df.withColumn(colTuple._1(), colTuple._2().get());
        }
    }
}

使用示例:

private class Pojo {
    private String str;
    private Integer number;
    private List<String> strList;
    private Pojo2 pojo2;

    public String getStr() {
        return str;
    }

    public Integer getNumber() {
        return number;
    }

    public List<String> getStrList() {
        return strList;
    }

    public Pojo2 getPojo2() {
        return pojo2;
    }

}

private class Pojo2 {
    private String str;
    private Integer number;
    private List<String> strList;

    public String getStr() {
        return str;
    }

    public Integer getNumber() {
        return number;
    }

    public List<String> getStrList() {
        return strList;
    }

}

SQLContext context = new SQLContext(new SparkContext("local[1]", "test"));
Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class);
Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str");

原始结构:

root
 |-- number: integer (nullable = true)
 |-- pojo2: struct (nullable = true)
 |    |-- number: integer (nullable = true)
 |    |-- str: string (nullable = true)
 |    |-- strList: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- str: string (nullable = true)
 |-- strList: array (nullable = true)
 |    |-- element: string (containsNull = true)

掉落后:

root
 |-- number: integer (nullable = true)
 |-- pojo2: struct (nullable = false)
 |    |-- number: integer (nullable = true)
 |    |-- strList: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- str: string (nullable = true)
 |-- strList: array (nullable = true)
 |    |-- element: string (containsNull = true)

【讨论】:

  • 添加一个如何调用它的简单示例,我会支持你
  • 根据@xXxpRoGrAmmErxXx 请求添加使用示例
【解决方案6】:

另一种 (PySpark) 方法是通过再次创建 features 来删除 features.feat1 列:

from pyspark.sql.functions import col, arrays_zip

display(df
        .withColumn("features", arrays_zip("features.feat2", "features.feat3"))
        .withColumn("features", col("features").cast(schema))
)

其中schema 是新架构(不包括features.feat1)。

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
      StructField('feat2', StringType(), True), 
      StructField('feat3', StringType(), True), 
    ]
  )

【讨论】:

    【解决方案7】:

    PySpark 实现

    import pyspark.sql.functions as sf
    
    def _drop_nested_field(
        schema: StructType,
        field_to_drop: str,
        parents: List[str] = None,
    ) -> Column:
        parents = list() if parents is None else parents
        src_col = lambda field_names: sf.col('.'.join(f'`{c}`' for c in field_names))
    
        if '.' in field_to_drop:
            root, subfield = field_to_drop.split('.', maxsplit=1)
            field_to_drop_from = next(f for f in schema.fields if f.name == root)
    
            return sf.struct(
                *[src_col(parents + [f.name]) for f in schema.fields if f.name != root],
                _drop_nested_field(
                    schema=field_to_drop_from.dataType,
                    field_to_drop=subfield,
                    parents=parents + [root]
                ).alias(root)
            )
    
        else:
            # select all columns except the one to drop
            return sf.struct(
                *[src_col(parents + [f.name])for f in schema.fields if f.name != field_to_drop],
            )
    
    
    def drop_nested_field(
        df: DataFrame,
        field_to_drop: str,
    ) -> DataFrame:
        if '.' in field_to_drop:
            root, subfield = field_to_drop.split('.', maxsplit=1)
            field_to_drop_from = next(f for f in df.schema.fields if f.name == root)
    
            return df.withColumn(root, _drop_nested_field(
                schema=field_to_drop_from.dataType,
                field_to_drop=subfield,
                parents=[root]
            ))
        else:
            return df.drop(field_to_drop)
    
    
    df = drop_nested_field(df, 'a.b.c.d')
    

    【讨论】:

    • _drop_nested_field 中的 sf 是什么?
    • @prakharjain import pyspark.sql.functions as sf我已经更新了示例。
    【解决方案8】:

    为此添加java版本解决方案。

    实用程序类(将您的数据集和嵌套列传递给 dropNestedColumn 函数)。

    (Lior Chaga 的回答中存在一些错误,我在尝试使用他的回答时已对其进行了纠正)。

    public class NestedColumnActions {
    /*
    dataset : dataset in which we want to drop columns
    columnName : nested column that needs to be deleted
    */
    public static Dataset<?> dropNestedColumn(Dataset<?> dataset, String columnName) {
    
        //Special case of top level column deletion
        if(!columnName.contains("."))
            return dataset.drop(columnName);
    
        final DataSetModifier dataFrameFolder = new DataSetModifier(dataset);
        Arrays.stream(dataset.schema().fields())
                .flatMap(f -> {
                    //If the column name to be deleted starts with current top level column
                    if (columnName.startsWith(f.name() + DOT)) {
                        //Get new column structure under f , expected after deleting the required column
                        final Optional<Column> column = dropSubColumn(functions.col(f.name()), f.dataType(), f.name(), columnName);
                        if (column.isPresent()) {
                            return Stream.of(new Tuple2<>(f.name(), column));
                        } else {
                            return Stream.empty();
                        }
                    } else {
                        return Stream.empty();
                    }
                })
                //Call accept function with Tuples of (top level column name, new column structure under it)
                .forEach(colTuple -> dataFrameFolder.accept(colTuple));
    
        return dataFrameFolder.getDataset();
    }
    
    private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
        Optional<Column> column = Optional.empty();
        if (!fullColumnName.equals(dropColumnName)) {
            if (colType instanceof StructType) {
                if (dropColumnName.startsWith(fullColumnName + DOT)) {
                    column = Optional.of(functions.struct(getColumns(col, (StructType) colType, fullColumnName, dropColumnName)));
                }
                else {
                    column = Optional.of(col);
                }
            } else {
                column = Optional.of(col);
            }
        }
    
        return column;
    }
    
    private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
        return Arrays.stream(colType.fields())
                .flatMap(f -> {
                            final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
                                    fullColumnName + "." + f.name(), dropColumnName);
                            if (column.isPresent()) {
                                return Stream.of(column.get().alias(f.name()));
                            } else {
                                return Stream.empty();
                            }
                        }
                ).toArray(Column[]::new);
    
    }
    
    private static class DataSetModifier implements Consumer<Tuple2<String, Optional<Column>>> {
        private Dataset<?> df;
    
        public DataSetModifier(Dataset<?> df) {
            this.df = df;
        }
    
        public Dataset<?> getDataset() {
            return df;
        }
    
        /*
        colTuple[0]:top level column name
        colTuple[1]:new column structure under it
       */
        @Override
        public void accept(Tuple2<String, Optional<Column>> colTuple) {
            if (!colTuple._2().isPresent()) {
                df = df.drop(colTuple._1());
            } else {
                df = df.withColumn(colTuple._1(), colTuple._2().get());
            }
        }
    }
    

    }

    【讨论】:

      【解决方案9】:

      Make Structs Easy* 库可以轻松执行嵌套数据结构中的添加、删除和重命名字段等操作。该库在 Scala 和 Python 中都可用。

      假设你有以下数据:

      import org.apache.spark.sql.functions._
      
      case class Features(feat1: String, feat2: String, feat3: String)
      case class Record(features: Features, arrayOfFeatures: Seq[Features])
      
      val df = Seq(
         Record(Features("hello", "world", "!"), Seq(Features("red", "orange", "yellow"), Features("green", "blue", "indigo")))
      ).toDF
      
      df.printSchema
      
      // root
      //  |-- features: struct (nullable = true)
      //  |    |-- feat1: string (nullable = true)
      //  |    |-- feat2: string (nullable = true)
      //  |    |-- feat3: string (nullable = true)
      //  |-- arrayOfFeatures: array (nullable = true)
      //  |    |-- element: struct (containsNull = true)
      //  |    |    |-- feat1: string (nullable = true)
      //  |    |    |-- feat2: string (nullable = true)
      //  |    |    |-- feat3: string (nullable = true)
      
      df.show(false)
      
      // +-----------------+----------------------------------------------+
      // |features         |arrayOfFeatures                               |
      // +-----------------+----------------------------------------------+
      // |[hello, world, !]|[[red, orange, yellow], [green, blue, indigo]]|
      // +-----------------+----------------------------------------------+
      

      然后从features 中删除feat2 就像这样简单:

      import com.github.fqaiser94.mse.methods._
      
      // drop feat2 from features
      df.withColumn("features", $"features".dropFields("feat2")).show(false)
      
      // +----------+----------------------------------------------+
      // |features  |arrayOfFeatures                               |
      // +----------+----------------------------------------------+
      // |[hello, !]|[[red, orange, yellow], [green, blue, indigo]]|
      // +----------+----------------------------------------------+
      

      我注意到有很多关于其他解决方案的后续 cmets 询问是否有办法删除嵌套在嵌套在数组内部的结构中的列。这可以通过结合Make Structs Easy库提供的函数和spark-hofs库提供的函数来实现,如下:

      import za.co.absa.spark.hofs._
      
      // drop feat2 in each element of arrayOfFeatures
      df.withColumn("arrayOfFeatures", transform($"arrayOfFeatures", features => features.dropFields("feat2"))).show(false)
      
      // +-----------------+--------------------------------+
      // |features         |arrayOfFeatures                 |
      // +-----------------+--------------------------------+
      // |[hello, world, !]|[[red, yellow], [green, indigo]]|
      // +-----------------+--------------------------------+
      

      *完全披露:我是此答案中引用的 Make Structs Easy 库的作者。

      【讨论】:

        【解决方案10】:

        对于 Spark 3.1+,您可以在结构类型列上使用方法 dropFields

        按名称删除 StructType 中的字段的表达式。这是一个无操作 如果架构不包含字段名称。

        val df = sql("SELECT named_struct('feat1', 1, 'feat2', 2, 'feat3', 3) features")
        
        val df1 = df.withColumn("features", $"features".dropFields("feat1"))
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-04-30
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-12-07
          相关资源
          最近更新 更多