【发布时间】:2019-01-12 08:10:54
【问题描述】:
我已将 CSV 数据加载到 Spark DataFrame 中。
我需要将此数据帧分割成两个不同的数据帧,每个数据帧都包含一组来自原始数据帧的列。
如何根据列将子集选择到 Spark 数据框中?
【问题讨论】:
标签: scala apache-spark apache-spark-sql
我已将 CSV 数据加载到 Spark DataFrame 中。
我需要将此数据帧分割成两个不同的数据帧,每个数据帧都包含一组来自原始数据帧的列。
如何根据列将子集选择到 Spark 数据框中?
【问题讨论】:
标签: scala apache-spark apache-spark-sql
如果要将数据框拆分为两个不同的数据框,请使用所需的不同列对其进行两次选择。
val sourceDf = spark.read.csv(...)
val df1 = sourceDF.select("first column", "second column", "third column")
val df2 = sourceDF.select("first column", "second column", "third column")
请注意,这当然意味着 sourceDf 将被评估两次,因此如果它可以适合分布式内存并且您在两个数据帧中使用大部分列,那么缓存它可能是一个好主意。它有许多您不需要的额外列,然后您可以先对其进行选择以选择您需要的列,以便它将所有额外数据存储在内存中。
【讨论】:
cache() 或persist() 方法来获取缓存版本。 persist 允许您选择缓存级别,cache 只是调用以默认的仅内存缓存级别持久化。
有多个选项(尤其是在 Scala 中)可以选择该 Dataframe 的列子集。以下几行显示了选项,其中大部分都记录在 Column 的 ScalaDocs 中:
import spark.implicits._
import org.apache.spark.sql.functions.{col, column, expr}
inputDf.select(col("colA"), col("colB"))
inputDf.select(inputDf.col("colA"), inputDf.col("colB"))
inputDf.select(column("colA"), column("colB"))
inputDf.select(expr("colA"), expr("colB"))
// only available in Scala
inputDf.select($"colA", $"colB")
inputDf.select('colA, 'colB) // makes use of Scala's Symbol
// selecting columns based on a given iterable of Strings
val selectedColumns: Seq[Column] = Seq("colA", "colB").map(c => col(c))
inputDf.select(selectedColumns: _*)
// Special cases
col("columnName.field") // Extracting a struct field
col("`a.column.with.dots`") // Escape `.` in column names.
// select the first or last 2 columns
inputDf.selectExpr(inputDf.columns.take(2): _*)
inputDf.selectExpr(inputDf.columns.takeRight(2): _*)
$ 的使用是可能的,因为 Scala 提供了一个隐式类,该类使用方法 $ 将字符串转换为列:
implicit class StringToColumn(val sc : scala.StringContext) extends scala.AnyRef {
def $(args : scala.Any*) : org.apache.spark.sql.ColumnName = { /* compiled code */ }
}
通常,当您想要将一个 DataFrame 派生到多个 DataFrame 时,如果您在创建其他 DataFrame 之前persist 原始 DataFrame 可能会提高您的性能。最后你可以unpersist原始DataFrame。
请记住,列不会在编译时解析,而是仅在与查询执行的分析器阶段发生的目录的列名进行比较时解析。如果您需要更强的类型安全性,您可以创建一个Dataset。
为了完整起见,这里是尝试上述代码的 csv:
// csv file:
// colA,colB,colC
// 1,"foo","bar"
val inputDf = spark.read.format("csv").option("header", "true").load(csvFilePath)
// resulting DataFrame schema
root
|-- colA: string (nullable = true)
|-- colB: string (nullable = true)
|-- colC: string (nullable = true)
【讨论】:
假设我们的父 Dataframe 有 'n' 列
我们可以创建 'x' 子数据帧(让我们考虑 2)。
子数据框的列可以根据需要从任何父数据框列中选择。
考虑源有 10 列,我们希望拆分为 2 个数据帧,其中包含从父数据帧引用的列。
子数据框的列可以使用 select 数据框 API
val parentDF = spark.read.format("csv").load("/path of the CSV file")
val Child1_DF = parentDF.select("col1","col2","col3","col9","col10").show()
val child2_DF = parentDF.select("col5", "col6","col7","col8","col1","col2").show()
请注意,子数据帧中的列数可能会有所不同,并且会小于父数据帧的列数。
我们还可以使用父数据帧中所需列的位置索引来引用列名而不提及真实名称
首先导入 spark 隐式,它充当帮助类,以使用 $-notation 来访问使用位置索引的列
import spark.implicits._
import org.apache.spark.sql.functions._
val child3_DF = parentDF.select("_c0","_c1","_c2","_c8","_c9").show()
我们还可以根据特定条件选择列。假设我们只想在子数据框中选择偶数列。甚至我们指的是偶数索引列和索引从'0'开始
val parentColumns = parentDF.columns.toList
res0: List[String] = List(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7,_c8,_c9)
val evenParentColumns = res0.zipWithIndex.filter(_._2 % 2 == 0).map( _._1).toSeq
res1: scala.collection.immutable.Seq[String] = List(_c0, _c2, _c4, _c6,_c8)
现在提供这些列以从 parentDF 中选择。注意选择 API 需要 seq 类型参数。所以我们将“evenParentColumns”转换为 Seq 集合
val child4_DF = parentDF.select(res1.head, res1.tail:_*).show()
这将显示父 Dataframe 中的偶数索引列。
| _c0 | _c2 | _c4 |_c6 |_c8 |
|ITE00100554|TMAX|空|电子| 1 |
|TE00100554 |TMIN|空|电子| 4 |
|GM000010962|PRCP|null|电子| 7 |
所以现在我们只剩下数据框中的偶数列了
同样,我们也可以对 Dataframe 列应用其他操作,如下所示
val child5_DF = parentDF.select($"_c0", $"_c8" + 1).show()
因此,通过上述多种方式,我们可以选择 Dataframe 中的列。
【讨论】:
已解决, 只需对数据框使用 select 方法来选择列:
val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\cabs_trajectories\\cabs_trajectories\\green\\2014\\green_tripdata_2014-09.csv")
val df1=df.select("_c0")
这将子集数据框的第一列
【讨论】:
我喜欢 dehasis 方法,因为它允许我一步选择、重命名和转换列。但是我必须对其进行调整以使其在 PySpark 中为我工作:
from pyspark.sql.functions import col
spark.read.csv(path).select(
col('_c0').alias("stn").cast('String'),
col('_c1').alias("wban").cast('String'),
col('_c2').alias("lat").cast('Double'),
col('_c3').alias("lon").cast('Double')
)
.where('_c2.isNotNull && '_c3.isNotNull && '_c2 =!= 0.0 && '_c3 =!= 0.0)
【讨论】:
只需使用 select select,您就可以选择特定的列,为它们提供可读的名称并进行转换。比如这样:
spark.read.csv(path).select(
'_c0.alias("stn").cast(StringType),
'_c1.alias("wban").cast(StringType),
'_c2.alias("lat").cast(DoubleType),
'_c3.alias("lon").cast(DoubleType)
)
.where('_c2.isNotNull && '_c3.isNotNull && '_c2 =!= 0.0 && '_c3 =!= 0.0)
【讨论】:
import spark.implicits._ 你可以在我的回购github.com/dehasi/odsc05/blob/master/observatory/src/main/scala/… 中看到的整个代码
您可以使用以下代码根据索引(位置)选择列。您可以更改变量 colNos 的数字以仅选择那些列
import org.apache.spark.sql.functions.col
val colNos = Seq(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35)
val Df_01 = Df.select(colNos_01 map Df.columns map col: _*)
Df_01.show(20, false)
【讨论】:
问题是在与其他连接后选择数据框上的列 数据框。
我在下面尝试并从连接中选择了salaryDf 的列 数据框。
希望这会有所帮助
val empDf=spark.read.option("header","true").csv("/data/tech.txt")
val salaryDf=spark.read.option("header","true").csv("/data/salary.txt")
val joinData= empDf.join(salaryDf,empDf.col("first") === salaryDf.col("first") and empDf.col("last") === salaryDf.col("last"))
//**below will select the colums of salaryDf only**
val finalDF=joinData.select(salaryDf.columns map salaryDf.col:_*)
//same way we can select the columns of empDf
joinData.select(empDf.columns map empDf.col:_*)
【讨论】:
joinData= empDf.join(salaryDf,Seq("first","last")) 在 python 中使用 joinData= empDf.join(salaryDf,["first","last"]) 希望对您有所帮助