【问题标题】:spark Scala RDD to DataFrame Date formatspark Scala RDD 到 DataFrame 日期格式
【发布时间】:2019-11-14 14:43:42
【问题描述】:

您能在这个火花概率声明中提供帮助吗

数据 -

empno|ename|designation|manager|hire_date|sal|deptno    
7369|SMITH|CLERK|9902|2010-12-17|800.00|20
7499|ALLEN|SALESMAN|9698|2011-02-20|1600.00|30

代码:

val rawrdd = spark.sparkContext.textFile("C:\\Users\\cmohamma\\data\\delta scenarios\\emp_20191010.txt")

val refinedRDD = rawrdd.map( lines => {   
val fields = lines.split("\\|")   (fields(0).toInt,fields(1),fields(2),fields(3).toInt,fields(4).toDate,fields(5).toFloat,fields(6).toInt)  
})

问题陈述 - 这不起作用 -fields(4).toDate ,有什么替代方案或用途是什么?

我尝试了什么?

  1. 尝试将其替换为 - to_date(col(fields(4)) , "yyy-MM-dd") - 不工作

2.

第 1 步。

val refinedRDD = rawrdd.map( lines => {   
val fields = lines.split("\\|")    
(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6))
})

现在这个元组都是字符串

第 2 步。

mySchema = StructType(StructField(empno,IntegerType,true), StructField(ename,StringType,true), StructField(designation,StringType,true), StructField(manager,IntegerType,true), StructField(hire_date,DateType,true), StructField(sal,DoubleType,true), StructField(deptno,IntegerType,true))

步骤 3. 将字符串元组转换为行

val rowRDD = refinedRDD.map(attributes => Row(attributes._1, attributes._2, attributes._3, attributes._4, attributes._5 , attributes._6, attributes._7))

第 4 步。

val empDF = spark.createDataFrame(rowRDD, mySchema)

这也不起作用,并给出与类型相关的错误。为了解决这个问题,我将步骤 1 更改为

(fields(0).toInt,fields(1),fields(2),fields(3).toInt,fields(4),fields(5).toFloat,fields(6).toInt)

现在这是日期类型列的错误,我又遇到了主要问题。

用例 - 使用 textFile Api,在其上使用自定义架构 (StructType) 将其转换为数据框。

这可以使用案例类来完成,但在案例类中,我也会被困在我需要执行字段 (4).toDate 的地方(我知道我可以稍后在代码中将字符串转换为日期,但如果出现上述问题解决办法)

【问题讨论】:

  • 为什么不简单地读取为 csv,使用 inferSchema 或提供自定义模式? val df = spark.read .option("delimiter", "\\|") .option("header", true) .option("inferSchema", "true") .csv(path) 应该足以读取数据帧。
  • @ShankarKoirala 因为该文件不是 csv,它是带有分隔符管道的 .dat 文件,我在步骤 2 中创建自定义模式,并在步骤 3 中将元组转换为行时出错使用该 rdd 行创建数据框。你知道一种将自定义模式附加到 rdd i 命令以创建数据框的方法吗?

标签: scala apache-spark


【解决方案1】:

可以使用下面的代码sn-p

import org.apache.spark.sql.functions.to_timestamp

scala> val df = spark.read.format("csv").option("header", "true").option("delimiter", "|").load("gs://otif-etl-input/test.csv")
df: org.apache.spark.sql.DataFrame = [empno: string, ename: string ... 5 more fields]

scala> val ts = to_timestamp($"hire_date", "yyyy-MM-dd")
ts: org.apache.spark.sql.Column = to_timestamp(`hire_date`, 'yyyy-MM-dd')

scala> val enriched_df = df.withColumn("ts", ts).show(2, false)
+-----+-----+-----------+-------+----------+-------+----------+-------------------+
|empno|ename|designation|manager|hire_date |sal    |deptno    |ts                 |
+-----+-----+-----------+-------+----------+-------+----------+-------------------+
|7369 |SMITH|CLERK      |9902   |2010-12-17|800.00 |20        |2010-12-17 00:00:00|
|7499 |ALLEN|SALESMAN   |9698   |2011-02-20|1600.00|30        |2011-02-20 00:00:00|
+-----+-----+-----------+-------+----------+-------+----------+-------------------+

enriched_df: Unit = ()

【讨论】:

  • 感谢您的回复,非常感谢。正如我确实提到的(我知道我可以稍后在代码中将字符串转换为日期,但如果上述问题解决方案是可能的)。数据文件基本上是.dat格式,分隔符是管道。
  • 文件不必是CSV,可以是任何文本文件。我们正在使用 csv 解析器来处理它,所以上面的代码也应该适用于 .DAT 文件。如果这个答案有帮助,请接受答案。
  • 非常感谢你让我清醒,现在我完全理解了 csv 包装器。这很好用。再次感谢。
  • 很高兴知道它有效,请接受答案。
  • 通过在 txt 或 dat 文件上使用 CSV 解析器,我可以推断架构并且也可以附加自定义架构。
【解决方案2】:

有多种方法可以将您的数据转换为适当的数据类型。

首先:使用InferSchema

val df = spark.read .option("delimiter", "\\|").option("header", true) .option("inferSchema", "true").csv(path)
df.printSchema

有时它不会按预期工作。查看详情here

第二:提供自己的数据类型转换模板

val rawDF = Seq(("7369", "SMITH" , "2010-12-17", "800.00"), ("7499", "ALLEN","2011-02-20", "1600.00")).toDF("empno", "ename","hire_date", "sal")
//define schema in DF , hire_date as Date
val schemaDF = Seq(("empno", "INT"), ("ename", "STRING"),  (**"hire_date", "date"**) , ("sal", "double")).toDF("columnName", "columnType")
rawDF.printSchema

    //fetch schema details
    val dataTypes = schemaDF.select("columnName", "columnType")
    val listOfElements = dataTypes.collect.map(_.toSeq.toList)
    //creating a map friendly template
    val validationTemplate = (c: Any, t: Any) => {
       val column = c.asInstanceOf[String]
       val typ = t.asInstanceOf[String]
       col(column).cast(typ)
      }

     //Apply datatype conversion template on rawDF 
    val convertedDF = rawDF.select(listOfElements.map(element => validationTemplate(element(0), element(1))): _*)
    println("Conversion done!")
    convertedDF.show()
    convertedDF.printSchema

第三个:案例类

使用 ScalaReflection 从案例类创建架构,并在加载 DF 时提供此自定义架构。

  import org.apache.spark.sql.catalyst.ScalaReflection
  import org.apache.spark.sql.types._

  case class MySchema(empno: int, ename: String, hire_date: Date, sal: Double)

  val schema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]

  val rawDF = spark.read.schema(schema).option("header", "true").load(path)
  rawDF.printSchema

希望这会有所帮助。

【讨论】:

  • 非常感谢您的详细回答。请在下面找到我的查询/回复:
  • 感谢您的详细解答。请在下面找到我的回复: Soln1 - 该文件是一个 .dat 文件,可以使用 textFile api 读取,因此没有意义。我不认为我可以应用这样的模式。在案例分类的情况下,如果您看到问题,我实际上已经这样做了,但是当我的拆分字段(字符串)的输出与案例类参数(日期)映射时,我被卡住了。我看到一种解决方案 - 将 rdd 日期字段处理为字符串,将 rdd 转换为数据帧,然后将列转换为 sql 查询或 spark 函数式 sql。 ("hire_date", "date") 星号表示 ?
  • 嘿 Ehsan,如果我的数据集很大,我可能不会使用 inferSchema,我想让我的转换保持惰性。 (inferschema 从磁盘读取所有数据)我的印象是只能使用 csv 包装器解析 CSV 文件。感谢您的回复,我从您的回复中学到了一些动态概念。
猜你喜欢
  • 1970-01-01
  • 2022-11-02
  • 2020-07-24
  • 2017-06-13
  • 1970-01-01
  • 1970-01-01
  • 2022-09-27
  • 2017-02-03
  • 2017-04-11
相关资源
最近更新 更多