【问题标题】:Strange error while writing parquet file to s3将镶木地板文件写入s3时出现奇怪的错误
【发布时间】:2020-02-19 07:21:30
【问题描述】:

在尝试将数据帧写入 S3 时,我收到以下带有 nullpointerexception 的错误。有时工作顺利,有时却失败了。

我正在使用 EMR 5.20 和 spark 2.4.0

Spark 会话创建

val spark = SparkSession.builder
        .config("spark.sql.parquet.binaryAsString", "true")
        .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
        .config("spark.sql.parquet.filterPushdown", "true")
        .config("spark.sql.parquet.fs.optimized.committer.optimization-enabled","true")
        .getOrCreate()

spark.sql("myQuery").write.partitionBy("partitionColumn").mode(SaveMode.Overwrite).option("inferSchema","false").parquet("s3a://...filePath")

谁能帮忙解开这个谜。提前致谢

java.lang.NullPointerException
  at com.amazon.ws.emr.hadoop.fs.s3.lite.S3Errors.isHttp200WithErrorCode(S3Errors.java:57)
  at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:100)
  at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184)
  at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:127)
  at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.deleteAll(Jets3tNativeFileSystemStore.java:364)
  at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.doSingleThreadedBatchDelete(S3NativeFileSystem.java:1372)
  at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.delete(S3NativeFileSystem.java:663)
  at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.delete(EmrFileSystem.java:332)
  at org.apache.spark.internal.io.FileCommitProtocol.deleteWithJob(FileCommitProtocol.scala:124)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.deleteMatchingPartitions(InsertIntoHadoopFsRelationCommand.scala:223)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:122)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
  at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:557)
  ... 55 elided

【问题讨论】:

  • Spark 写入策略有多种。查看它来自 S3 端的错误,而不是 s3a://... 尝试使用 s3 策略,这样它将是 s3:/...
  • 可能是语法错误...试试这个val spark = (SparkSession.builder() .config("spark.sql.parquet.binaryAsString", "true") .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") .config("spark.sql.parquet.filterPushdown", "true") .config("spark.sql.parquet.fs.optimized.committer.optimization-enabled","true") .getOrCreate())
  • 您好 s3 和 s3a 都试过了。仅当我们尝试覆盖时才会发生这种情况。当输出路径中没有文件时,作业运行良好。
  • @DineshJ 你最终找到解决方案了吗?我看到了完全相同的行为。
  • @DineshJ 有什么解决方案吗?当我将它写入 csv 但不是用镶木地板时,它可以完美运行

标签: scala apache-spark amazon-s3 apache-spark-sql amazon-emr


【解决方案1】:

看起来像 AWS 代码中的错误。那是闭源——你必须接受他们。

我确实看到了一个提示,表明这是尝试解析错误响应的代码中的错误。也许有些事情失败了,但是客户端上传递错误响应的代码有问题。这不是很不寻常吗?是故障处理很少获得足够的测试覆盖率

【讨论】:

    【解决方案2】:

    您使用的是SaveMode.Overwrite,错误行com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:127)表示在覆盖的删除操作过程中出现问题。

    我会检查并确保您的 EMR EC2 实例配置文件的 IAM 策略中的 S3 权限允许在您调用编写 Parquet 时对文件路径执行 s3:DeleteObject 操作。它应该看起来像这样:

    {
      "Sid": "AllowWriteAccess",
      "Action": [
        "s3:DeleteObject",
        "s3:Get*",
        "s3:List*",
        "s3:PutObject"
      ],
      "Effect": "Allow",
      "Resource": [
        "<arn_for_your_filepath>/*"
      ]
    }
    

    在作业之间,您是否在调用中使用不同的文件路径来编写 Parquet?如果是这样,那将解释间歇性作业失败

    【讨论】:

    • 这解决了我的问题。我无法更改权限,所以我写了新的 s3 路径。
    猜你喜欢
    • 1970-01-01
    • 2021-10-26
    • 2017-03-09
    • 2020-04-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-24
    • 1970-01-01
    相关资源
    最近更新 更多