【问题标题】:Spark Dataframe select column using caseSpark Dataframe 选择列使用案例
【发布时间】:2017-08-11 03:25:09
【问题描述】:

我想实现以下内容
例如我有 Emp 文件(2 个文件) 我只想选择 2 列,例如 Empid 和 EmpName 如果文件没有 EmpName 它应该选择 Empid 数据框的一列

1) Emp1.csv(文件)

Empid   EmpName Dept
1       ABC     IS
2       XYZ     COE

2) Emp.csv(文件)

 Empid  EmpName
 1      ABC
 2      XYZ

代码尝试到现在

scala>  val SourceData = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("delimiter", ",").option("header", "true").load("/root/Empfiles/")
SourceData: org.apache.spark.sql.DataFrame = [Empid: string, EmpName: string ... 1 more field]

scala> SourceData.printSchema
root
|-- Empid: string (nullable = true)
|-- EmpName: string (nullable = true)
|-- Dept: string (nullable = true)

如果指定文件的所有列名,则此代码有效

 scala> var FormatedColumn = SourceData.select(
 |             SourceData.columns.map {
| case "Empid"                     => SourceData("Empid").cast(IntegerType).as("empid")
 | case "EmpName"                     => SourceData("EmpName").cast(StringType).as("empname")
 | case "Dept"                     => SourceData("Dept").cast(StringType).as("dept")
 | }: _*
 | )
 FormatedColumn: org.apache.spark.sql.DataFrame = [empid: int, empname: string ... 1 more field]

但我只想要特定的 2 列它会失败(如果列可用,它会显示选择并更改数据类型和列名)

 scala> var FormatedColumn = SourceData.select(
 | SourceData.columns.map {
 | case "Empid"                     => SourceData("Empid").cast(IntegerType).as("empid")
 | case "EmpName"                     => SourceData("EmpName").cast(StringType).as("empname")
 | }: _*
 | )
 scala.MatchError: Dept (of class java.lang.String)
 at $anonfun$1.apply(<console>:32)
 at $anonfun$1.apply(<console>:32)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  ... 53 elided

【问题讨论】:

  • 它会抛出一个 MatchError,因为它在你的 map 函数中找不到匹配的大小写。如果您添加默认情况,它应该会成功。您也可以先在“Empid”和“EmpName”上运行选择,然后格式化列。
  • 对不起,我是 scala 的新手,默认情况下我不想做任何事情,所以我应该写什么?

标签: scala apache-spark dataframe apache-spark-sql spark-dataframe


【解决方案1】:

所有其他列也需要匹配:

var formattedColumn = sourceData.select(
  sourceData.columns.map {
      case "Empid"   => sourceData("Empid").cast(IntegerType).as("empid")
      case "EmpName" => sourceData("EmpName").cast(StringType).as("empname")
      case other: String => sourceData(other)
  }: _*
)

更新 1。如果只想选择“Empid”和“EmpName”两列,则无需使用匹配器:

val formattedColumn = sourceData.select(
  sourceData("Empid").cast(IntegerType).as("empid"),
  sourceData("EmpName").cast(StringType).as("empname")
)

更新 2。如果您想根据它们的存在来选择列,我可以建议以下内容:

val colEmpId = "Empid"
val colEmpName = "EmpName"
// list of possible expected column names
val selectableColums = Seq(colEmpId, colEmpName)
// take only the ones that are in the list
val foundColumns = sourceData.columns.filter(column => selectableColums.contains(column))
// create the target dataframe
val formattedColumn = sourceData.select(
  foundColumns.map(column =>
    column match {
      case colEmpId   => sourceData(colEmpId).cast(IntegerType).as("empid")
      case colEmpName => sourceData(colEmpName).cast(StringType).as("empname")
      case _ => throw new IllegalArgumentException("Unexpected column: " + column)
    }
  ): _*
)

附言请为vals 和vars 使用传统的驼峰命名法。

【讨论】:

  • scala> FormattedColumn.printSchema root |-- empid: integer (nullable = true) |-- empname: string (nullable = true) |-- Dept: string (nullable = true) 我不想要在 formattedcolumn 数据框中选择部门我只想要 2 列
  • 不,如果文件没有 EmpName,它不会工作,它应该选择 Empid 数据帧的一列,在这种情况下它会失败
【解决方案2】:

如果你用这个查询替换你的语句,它应该可以工作。 它会过滤掉所有不属于 match 子句的列。这样可以避免您看到的 MatchError。

df.select($"Empid", $"EmpName").select(df.columns.map {
    case "Empid" => df("Empid").cast(IntegerType).as("empid")
    case "EmpName" => df("EmpName").cast(StringType).as("empname")
}: _*)

【讨论】:

  • 不,它不起作用我已经尝试过您的代码 :32:错误:重载方法值选择与替代方案:无法应用于 (Array[org.apache.spark.sql.Column]) df.select($"Empid", $"EmpName").select(df.columns.map {
  • 我想我错过了“:_*”。
  • 如果文件没有 EmpName 它应该选择 Empid 数据框的一列我认为它不满足这一点
  • 是的,你是对的。它只会避免您看到的 MatchError。
【解决方案3】:

我不知道为什么这么复杂..

为什么不这样做?

df
  .withColumn("empid", $"EmpId".cast(IntegerType))
  .withColumn("empname", $"EmpName".cast(StringType))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-01-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多