【问题标题】:Saving empty dataframe to parquet results in error - Spark 2.4.4将空数据框保存到镶木地板会导致错误 - Spark 2.4.4
【发布时间】:2020-03-18 18:40:34
【问题描述】:

我有一段代码,最后我将数据帧写入镶木地板文件。

逻辑使得数据框有时可能为空,因此我收到以下错误。

df.write.format("parquet").mode("overwrite").save(somePath)

org.apache.spark.sql.AnalysisException: Parquet data source does not support null data type.;

当我打印“df”的模式时,我得到了下面。

df.schema
res2: org.apache.spark.sql.types.StructType = 
StructType(
    StructField(rpt_date_id,IntegerType,true), 
    StructField(rpt_hour_no,ShortType,true), 
    StructField(kpi_id,IntegerType,false), 
    StructField(kpi_scnr_cd,StringType,false), 
    StructField(channel_x_id,IntegerType,false), 
    StructField(brand_id,ShortType,true), 
    StructField(kpi_value,FloatType,false), 
    StructField(src_lst_updt_dt,NullType,true), 
    StructField(etl_insrt_dt,DateType,false), 
    StructField(etl_updt_dt,DateType,false)
)

是否有一种解决方法可以只使用架构写入空文件,或者在为空时根本不写入文件? 谢谢

【问题讨论】:

    标签: scala apache-spark apache-spark-sql parquet


    【解决方案1】:

    您遇到的错误与您的数据框为空这一事实无关。我看不出保存空数据框的意义,但如果你愿意,你可以这样做。如果你不相信我,试试这个:

    val schema = StructType( 
        Array(
            StructField("col1",StringType,true),  
            StructField("col2",StringType,false)
        )
    )
    
    spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
         .write
         .format("parquet")
         .save("/tmp/test_empty_df")
    

    您收到该错误是因为您的列之一是 NullType,并且抛出的异常表明 "Parquet data source does not support null data type"

    我无法确定为什么您有一个 Null 类型的列,但是当您从源读取数据并让 spark 推断架构时通常会发生这种情况。如果该源中有一个空列,spark 将无法推断架构并将其设置为 null 类型。

    如果发生这种情况,我的建议是您在读取时指定架构

    如果不是这种情况,一个可能的解决方案是将 NullType 的所有列强制转换为 parquet 兼容类型(如 StringType)。这是一个关于如何做到这一点的例子:

    //df is a dataframe with a column of NullType
    val df = Seq(("abc",null)).toDF("col1", "col2")
    df.printSchema
    root
     |-- col1: string (nullable = true)
     |-- col2: null (nullable = true)
    
    
    //fold left to cast all NullType to StringType
    val df1 = df.columns.foldLeft(df){
        (acc,cur) => {
            if(df.schema(cur).dataType == NullType)
                acc.withColumn(cur, col(cur).cast(StringType))
            else
                acc
        }
    }
    df1.printSchema
    root
     |-- col1: string (nullable = true)
     |-- col2: string (nullable = true)
    

    希望对你有帮助

    【讨论】:

    • 我有一个带有架构的空 DF 如何将其写为镶木地板文件
    【解决方案2】:

    '还是空时根本不写文件?'检查df是否不为空,然后只写。

    if (!df.isEmpty)
      df.write.format("parquet").mode("overwrite").save("somePath")
    

    【讨论】:

    • 我认为如果df 是一个大的Dataframe,最好添加try-catch 而不是!df.isEmpty。但是您的代码也适用于这个用例。
    • @CesarA.Mostacero 大Dataframe 有什么问题?
    • 没有什么“错”,只是我们可以避免执行2个动作,只用一个就可以达到目标。
    • 是的 try-catch 可以正常工作,但是必须捕获 try-catch 特定的异常(org.apache.spark.sql.AnalysisException),因为其他问题,内存不足异常,hdfs 异常等也可以抛出。
    猜你喜欢
    • 2020-03-11
    • 1970-01-01
    • 1970-01-01
    • 2017-04-25
    • 2020-10-23
    • 2016-07-04
    • 1970-01-01
    • 2020-08-14
    • 1970-01-01
    相关资源
    最近更新 更多