【问题标题】:Spark: Parquet DataFrame operations fail when forcing schema on readSpark:强制读取模式时 Parquet DataFrame 操作失败
【发布时间】:2018-02-16 20:02:37
【问题描述】:

(火花 2.0.2)

当您拥有具有不同架构的镶木地板文件并在读取期间强制使用架构时,就会出现问题。即使您可以打印架构并运行show() ok,您也不能对丢失的列应用任何过滤逻辑。

这是两个示例模式:

// assuming you are running this code in a spark REPL
import spark.implicits._

case class Foo(i: Int)
case class Bar(i: Int, j: Int) 

所以Bar 包含了Foo 的所有字段并增加了一个(j)。在现实生活中,当您从架构 Foo 开始,后来决定需要更多字段并以架构 Bar 结束时,就会出现这种情况。

让我们模拟两个不同的 parquet 文件。

// assuming you are on a Mac or Linux OS
spark.createDataFrame(Foo(1)::Nil).write.parquet("/tmp/foo")
spark.createDataFrame(Bar(1,2)::Nil).write.parquet("/tmp/bar")

我们希望始终使用更通用的架构Bar 读取数据。也就是说,写入模式Foo 的行应该有j 为空。

案例 1:我们阅读了两种模式的混合

spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").show()
+---+----+
|  i|   j|
+---+----+
|  1|   2|
|  1|null|
+---+----+


spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").filter($"j".isNotNull).show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

案例 2:我们只有 Bar 数据

spark.read.parquet("/tmp/bar").show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

案例 3:我们只有 Foo 数据

scala> spark.read.parquet("/tmp/foo").show()
+---+
|  i|
+---+
|  1|
+---+

有问题的情况是 3,我们生成的模式是 Foo 类型,而不是 Bar。由于我们迁移到架构Bar,我们希望始终从我们的数据(旧的和新的)中获取架构Bar

建议的解决方案是以编程方式将架构定义为始终为Bar。让我们看看如何做到这一点:

val barSchema = org.apache.spark.sql.Encoders.product[Bar].schema
//barSchema: org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false)) 

运行 show() 效果很好:

scala> spark.read.schema(barSchema).parquet("/tmp/foo").show()
+---+----+
|  i|   j|
+---+----+
|  1|null|
+---+----+

但是,如果您尝试过滤缺少的列 j,则会失败。

scala> spark.read.schema(barSchema).parquet("/tmp/foo").filter($"j".isNotNull).show()
17/09/07 18:13:50 ERROR Executor: Exception in task 0.0 in stage 230.0 (TID 481)
java.lang.IllegalArgumentException: Column [j] was not found in schema!
    at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
    at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
    at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
    at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:381)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:355)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

    标签: scala apache-spark dataframe schema parquet


    【解决方案1】:

    问题是由于 parquet 过滤器下推导致的,在 parquet-mr 版本 中未正确处理

    您可以查看https://issues.apache.org/jira/browse/PARQUET-389了解更多详情。

    您可以升级 parquet-mr 版本或添加新列并将过滤器基于新列。

    例如。

    dfNew = df.withColumn("new_j", when($"j".isNotNull, $"j").otherwise(lit(null))) dfNew.filter($"new_j".isNotNull)

    【讨论】:

      【解决方案2】:

      在 Spark 1.6 上运行良好,架构检索已更改,使用了 HiveContext:

      val barSchema = ScalaReflection.schemaFor[Bar].dataType.asInstanceOf[StructType]
      println(s"barSchema: $barSchema")
      hiveContext.read.schema(barSchema).parquet("tmp/foo").filter($"j".isNotNull).show()
      

      结果是:

      barSchema: StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false))
      +---+----+
      |  i|   j|
      +---+----+
      |  1|null|
      +---+----+
      

      【讨论】:

      • 感谢您的帮助!也许这是 2.0.2 的回归。无论如何,我找到了答案中列出的技巧来解决问题。
      【解决方案3】:

      对我有用的是将createDataFrame API 与RDD[Row] 和新架构(至少新列可以为空)一起使用。

      // Make the columns nullable (probably you don't need to make them all nullable)
      val barSchemaNullable = org.apache.spark.sql.types.StructType(
         barSchema.map(_.copy(nullable = true)).toArray)
      
      // We create the df (but this is not what you want to use, since it still has the same issue)
      val df = spark.read.schema(barSchemaNullable).parquet("/tmp/foo")
      
      // Here is the final API that give a working DataFrame
      val fixedDf = spark.createDataFrame(df.rdd, barSchemaNullable)
      
      fixedDf.filter($"j".isNotNull).show()
      
      +---+---+
      |  i|  j|
      +---+---+
      +---+---+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-08-15
        • 2017-06-04
        • 2020-05-17
        • 2018-10-29
        • 1970-01-01
        • 2020-08-21
        • 2017-01-16
        相关资源
        最近更新 更多