【问题标题】:How to change Spark Dataframe column data type in an array如何更改数组中的 Spark Dataframe 列数据类型
【发布时间】:2017-06-12 04:22:31
【问题描述】:

关于我的一个更大的问题,我遇到了两个小问题:我想每天读取一次 JSON 数据并将其保存为 Parquet 以供以后与数据相关的工作。使用镶木地板要快得多。但是我遇到的问题是,在读取该 parquet 时,Spark 总是尝试从模式文件中获取模式,或者只是从第一个 parquet 文件中获取模式,并假定所有文件的模式都是相同的。但有些情况下,我们在某些列中几天没有任何数据。

假设我有一个 JSON 文件,其中的数据具有以下架构:

root
 |-- Id: long (nullable = true)    
 |-- People: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Amount: double (nullable = true)

然后我有另一个 JSON 文件,其中没有“人员”列的数据。因此架构如下:

root
 |-- Id: long (nullable = true)    
 |-- People: array (nullable = true)
 |    |-- element: string (containsNull = true)

当我将它们与read.json 一起读入时,Spark 会遍历所有文件并从中推断出合并的架构,更具体地说是从第一个文件中推断出合并的架构,然后将第二个文件中的行留空,但架构是正确。

但是当我分别阅读这些内容并分别写入 parquet 时,我无法将它们一起阅读,因为对于 Parquet,架构不匹配并且我收到错误消息。

我的第一个想法是读取缺少数据的文件并通过强制转换列类型以匹配第一个模式来手动更改其模式,但是这种手动转换是错误的,它可能不同步,我什至不知道如何将此字符串类型转换为数组或结构类型。

另一个问题是当“金额”字段只有完整的整数时,Spark 会根据需要将它们读取为 longs 而不是 doubles。但如果我使用:

val df2 = df.withColumn("People.Amount", col("People.Amount").cast(org.apache.spark.sql.types.ArrayType(org.apache.spark.sql.types.DoubleType,true)))

那么它并没有改变原来列的类型,而是增加了一个名为People.Amount的新列

【问题讨论】:

    标签: json scala apache-spark spark-dataframe parquet


    【解决方案1】:

    我认为您可以通过模式合并来解决问题(请参阅文档here)。如果您拥有的第一个镶木地板具有正确的架构,那么您可以执行类似的操作将该架构应用于新的实木复合地板吗?

    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    mergedDF.printSchema()
    

    编辑

    你说有 200 多列,你都知道了吗?我看到了两条前进的道路,并且可能有很多方法可以实现这一目标。一种是您预先定义所有可以看到的字段。我过去所做的是创建一个带有单个虚拟记录的 json 文件,其中包含我想要的所有字段,并且完全按照我想要的方式输入。然后,您可以始终与“星期一”或“星期二”数据集同时加载该记录,并在加载后将其删除。这可能不是最佳实践,但这就是我在前进的道路上跌跌撞撞的方式。

    另一种方法是停止尝试在正确的架构中加载/保存单个数据集,并在加载所有数据后设置架构。听起来不像你想走的路,但至少你不会遇到这个特定的问题。

    【讨论】:

    • 我很确定我已经尝试过了。 Spark 1.6 和 2.0 之间有什么区别吗?但无论如何,如果假设星期一数据具有第一个模式,星期二数据具有第二个模式(在某些字段中没有数据)并且星期三数据再次具有第一个模式,那么我永远无法确定“第一个”镶木地板具有“正确的”模式。如果我想在星期一和星期二一起阅读,它有,但如果我想要星期二和星期三,那么第一个不是正确的模式,它不会工作。我会很快再试一次,但我认为这不起作用,我可能遗漏了一些重要的东西
    • 嗯,是的,我又试了一次,得到了我预期的结果。这不起作用,因为合并镶木地板仅在您添加列时才有效。当某些列的类型不同时,它会失败。我知道这是有道理的,但我不知道如何解决这个问题。我不知道如何告诉 Spark 我在嵌套结构中的 JSON 文件中有 200 多个属性字段的确切模式。我收到以下错误:org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Name,StringType,true), StructField(Amount,DoubleType,true)),true) and StringType
    • 好吧,我们当前流程的工作方式是,我们的 GoLang 服务器记录一个事件(三种类型之一)并将 Go 结构编组为 JSON 字符串并将其发送到相应的 Firehose 流(三种类型之一) )。然后将带有原始 json 数据的文件保存到 S3,并带有特定的类型和日期前缀,即s3://bucket/Type1/2017/01/27/00。当我们开发系统时,我们的 Go Lang 结构模式会发生变化,因此数据会有所不同。当然,在这种情况下,我们主要会添加数据列。问题是当数据本身丢失并且 spark 读取类型错误的数据时。
    • 对 - 您的流程的一部分是将包含所有可用列的 1 行文件移动到同一位置,然后与传入数据合并 - 这意味着所有数据都将以正确的格式加载,然后您只需删除一条虚拟记录。
    • 其实这似乎是合理的。我会试试这个。谢谢
    猜你喜欢
    • 2018-05-14
    • 2015-06-05
    • 2016-06-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多