【问题标题】:How to load mixed Parquet schema into DataFrame using Apache Spark?如何使用 Apache Spark 将混合 Parquet 模式加载到 DataFrame 中?
【发布时间】:2018-11-30 21:56:38
【问题描述】:

我有一个 Spark 作业,不断将 Parquet 文件上传到 S3(带分区)。
这些文件都具有相同的镶木地板架构。

其中一种字段类型最近已更改(从 String 更改为 long),因此某些分区的 parquet 模式是混合的。

具有两种类型的混合数据的地方现在无法读取某些内容。
虽然看起来我可以执行:sqlContext.read.load(path)
当尝试对 DataFrame 应用任何 fetch 操作时(例如collect),操作失败并显示ParquetDecodingException

我打算迁移数据并重新格式化它但未能将混合内容读入 DataFrame
如何使用 Apache Spark 将混合分区加载到 DataFrame 或任何其他 Spark 构造中?

以下是 ParquetDecodingException 跟踪:

scala> df.collect
[Stage 1:==============>        (1 + 3) / 4]
WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 172.1.1.1, executor 0): org.apache.parquet.io.ParquetDecodingException: 
Can not read value at 1 in block 0 in file 
s3a://data/parquet/partition_by_day=20180620/partition_by_hour=10/part-00000-6e4f07e4-3d89-4fad-acdf-37054107dc39.snappy.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:166)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)

【问题讨论】:

    标签: apache-spark dataframe amazon-s3 parquet


    【解决方案1】:

    据我所知,您不能将具有相同字段和不同类型的 2 个模式混合在一起。因此,我能想到的唯一解决方案是:

    1. List files of partition

    2. 将每个文件重新写入新位置并transform the data to the right schame

    3. 如果原始数据已分区,则需要再通过一次才能恢复分区。
      这是因为逐个文件重写数据会覆盖分区。
    4. 检查您是否可以将所有新分区读取为正确的架构。
    5. 删除“坏”分区并改为复制 tmp 分区

    【讨论】:

    • 精确的答案,证明有效且正确。添加了步骤 (4) 以获得分区数据的完整答案,就像我的情况一样。
    【解决方案2】:

    还有一个想法:不是更改现有字段的类型(field_string),而是添加一个长类型的新字段(field_long)并将读取数据的代码更新为类似这样的内容(在伪代码中)并启用架构合并。我相信它默认启用,但这是一个明确说明的好案例:

    sqlContext.read.option("mergeSchema", "true").parquet(<parquet_file>)
    
    ...
    
    if isNull(field_long) 
      field_value_long = field_string.value.to_long
    else
      field_value_long = field_long.value
    

    【讨论】:

    • 自 Spark 1.5 起,模式合并已默认禁用。 sqlContext 也已弃用,在编写新代码时更喜欢 val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-23
    • 2017-03-14
    • 1970-01-01
    • 2019-02-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多