【问题标题】:How do I detect if a Spark DataFrame has a column如何检测 Spark DataFrame 是否有列
【发布时间】:2016-06-24 13:38:21
【问题描述】:

当我在 Spark SQL 中从 JSON 文件创建 DataFrame 时,如何在调用 .select 之前判断给定列是否存在

示例 JSON 架构:

{
  "a": {
    "b": 1,
    "c": 2
  }
}

这就是我想做的:

potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))

但我找不到hasColumn 的好函数。我得到的最接近的是测试列是否在这个有点尴尬的数组中:

scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql


    【解决方案1】:

    假设它存在并让它以Try 失败。简单明了,支持任意嵌套:

    import scala.util.Try
    import org.apache.spark.sql.DataFrame
    
    def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
    
    val df = sqlContext.read.json(sc.parallelize(
      """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))
    
    hasColumn(df, "foobar")
    // Boolean = false
    
    hasColumn(df, "foo")
    // Boolean = true
    
    hasColumn(df, "foo.bar")
    // Boolean = true
    
    hasColumn(df, "foo.bar.foobar")
    // Boolean = true
    
    hasColumn(df, "foo.bar.foobaz")
    // Boolean = false
    

    甚至更简单:

    val columns = Seq(
      "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")
    
    columns.flatMap(c => Try(df(c)).toOption)
    // Seq[org.apache.spark.sql.Column] = List(
    //   foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)
    

    Python 等价物:

    from pyspark.sql.utils import AnalysisException
    from pyspark.sql import Row
    
    
    def has_column(df, col):
        try:
            df[col]
            return True
        except AnalysisException:
            return False
    
    df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()
    
    has_column(df, "foobar")
    ## False
    
    has_column(df, "foo")
    ## True
    
    has_column(df, "foo.bar")
    ## True
    
    has_column(df, "foo.bar.foobar")
    ## True
    
    has_column(df, "foo.bar.foobaz")
    ## False
    

    【讨论】:

    • 这也适用于结构化字段。使用contains 函数的解决方案没有! +1
    • 乍一看,df(path)df[col] 似乎是一个非常昂贵的测试,但这都是懒惰的 dag 构建,所以它很便宜,对吗?
    • @Davos 这与懒惰无关。 Columns 不是数据容器,而是查询描述模型的组成部分。在更广泛的背景下,要了解有关 Dataset 的任何信息,您需要处理并检查其逻辑 QueryExectution.analyzed 计划,该计划适用于 col / applyresolve)或 schema / columns 等。跨度>
    • @10465355saysReinstateMonica 谢谢这正是我的意思。我指的不是 Scala 中的 lazy 指令,而是 Spark 如何创建逻辑计划、将其转换为物理计划然后在集群上执行任务的分阶段方法。如果这是在逻辑计划阶段解决,那么它很便宜。
    • @zero323 python 等效项不适用于数组内的结构列。例如'{ "name": "test", "address":[{"houseNumber":"1234"}]}' 应该使用 df.select 而不是 df[col]
    【解决方案2】:

    我通常使用的另一个选项是

    df.columns.contains("column-name-to-check")
    

    这会返回一个布尔值

    【讨论】:

    • 是的,没错,它不适用于嵌套列。
    • 尝试:: df.columns.__contains__("column-name-to-check")
    【解决方案3】:

    实际上你甚至不需要调用 select 来使用列,你可以在数据框本身上调用它

    // define test data
    case class Test(a: Int, b: Int)
    val testList = List(Test(1,2), Test(3,4))
    val testDF = sqlContext.createDataFrame(testList)
    
    // define the hasColumn function
    def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)
    
    // then you can just use it on the DF with a given column name
    hasColumn(testDF, "a")  // <-- true
    hasColumn(testDF, "c")  // <-- false
    

    或者,您可以使用 pimp my library 模式定义一个隐式类,以便直接在您的数据帧上使用 hasColumn 方法

    implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
        def hasColumn(colName: String) = df.columns.contains(colName)
    }
    

    那么你可以把它当作:

    testDF.hasColumn("a") // <-- true
    testDF.hasColumn("c") // <-- false
    

    【讨论】:

    • 这不适用于嵌套列。来自 json {"a":{"b":1,"c":0}}
    【解决方案4】:

    Try 不是最优的,因为它会在做出决定之前评估 Try 内部的表达式。

    对于大型数据集,请在Scala 中使用以下内容:

    df.schema.fieldNames.contains("column_name")
    

    【讨论】:

    • 我同意 Try 不是最佳选择。我要做的是:我用字段数组创建一个列,然后用array_contains 进行测试:val fields = df.schema.fieldNames; df.withColumn("fields",lit(fields)).withColumn("has_column", when(array_contains($"fields","field1"),lit(true))) ```
    【解决方案5】:

    对于那些在寻找 Python 解决方案时偶然发现的人,我使用:

    if 'column_name_to_check' in df.columns:
        # do something
    

    当我使用 Python 尝试@Jai Prakash 对df.columns.contains('column-name-to-check') 的回答时,我得到了AttributeError: 'list' object has no attribute 'contains'

    【讨论】:

    • 这种在数据框中检查 col 的方式会减慢 spark 处理速度吗?
    • @vasista 你能帮我更好地理解你的问题吗?是否有其他方法可以用来比较性能?
    【解决方案6】:

    您的另一个选择是对df.columns 和您的potential_columns 进行一些数组操作(在本例中为intersect)。

    // Loading some data (so you can just copy & paste right into spark-shell)
    case class Document( a: String, b: String, c: String)
    val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF
    
    // The columns we want to extract
    val potential_columns = Seq("b", "c", "d")
    
    // Get the intersect of the potential columns and the actual columns, 
    // we turn the array of strings into column objects
    // Finally turn the result into a vararg (: _*)
    df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show
    

    唉,这对你上面的内部对象场景不起作用。您需要查看架构。

    我要将您的 potential_columns 更改为完全限定的列名

    val potential_columns = Seq("a.b", "a.c", "a.d")
    
    // Our object model
    case class Document( a: String, b: String, c: String)
    case class Document2( a: Document, b: String, c: String)
    
    // And some data...
    val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF
    
    // We go through each of the fields in the schema.
    // For StructTypes we return an array of parentName.fieldName
    // For everything else we return an array containing just the field name
    // We then flatten the complete list of field names
    // Then we intersect that with our potential_columns leaving us just a list of column we want
    // we turn the array of strings into column objects
    // Finally turn the result into a vararg (: _*)
    df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show
    

    这只深入了一层,因此要使其通用,您必须做更多的工作。

    【讨论】:

      【解决方案7】:

      在 pyspark 中你可以简单地运行

      df.columns 中的“字段”

      【讨论】:

        【解决方案8】:

        如果您在加载 json 时使用架构定义对其进行粉碎,则无需检查该列。如果它不在 json 源中,它将显示为空列。

                val schemaJson = """
          {
              "type": "struct",
              "fields": [
                  {
                    "name": field1
                    "type": "string",
                    "nullable": true,
                    "metadata": {}
                  },
                  {
                    "name": field2
                    "type": "string",
                    "nullable": true,
                    "metadata": {}
                  }
              ]
          }
                """
            val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]
        
            val djson = sqlContext.read
            .schema(schema )
            .option("badRecordsPath", readExceptionPath)
            .json(dataPath)
        

        【讨论】:

          【解决方案9】:

          对于嵌套列,您可以使用

          df.schema.simpleString().find('column_name')
          

          【讨论】:

          • 如果您有一个名称类似于架构字符串中无关文本的列,这对我来说似乎不可靠。
          【解决方案10】:
          def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
            Try(df.select(colName)).isSuccess
          

          使用上面提到的函数来检查包含嵌套列名的列是否存在。

          【讨论】:

            【解决方案11】:

            在 PySpark 中,df.columns 为您提供数据框中的列列表,因此 df.columns 中的“colName” 将返回 True 或 False。试一试。祝你好运!

            【讨论】:

            • df1.columns 显示 ['bankData', 'reference', 'submissionTime'];但是 df1['bankData']['userAddress'].columns 显示 Column,没有显示结构,我错过了什么吗?
            猜你喜欢
            • 2018-07-29
            • 1970-01-01
            • 1970-01-01
            • 2018-01-29
            • 1970-01-01
            • 1970-01-01
            • 2021-12-12
            • 2021-12-24
            • 1970-01-01
            相关资源
            最近更新 更多