【问题标题】:Apache Spark + Parquet not Respecting Configuration to use “Partitioned” Staging S3A CommitterApache Spark + Parquet 不尊重配置以使用“分区”暂存 S3A 提交程序
【发布时间】:2020-07-12 07:00:18
【问题描述】:

我正在使用本地机器上的 Apache Spark (3.0) 将分区数据(Parquet 文件)写入 AWS S3,而我的机器上没有安装 Hadoop。当我有很多文件要写入大约 50 个分区(partitionBy = date)时,我在写入 S3 时遇到了 FileNotFoundException。

然后我遇到了新的S3A committer,所以我尝试配置“分区”提交者。但是当文件格式为“parquet”时,我仍然可以看到 Spark 使用 ParquetOutputCommitter 而不是 PartitionedStagingCommitter。当我有大量数据要写入时,我仍然会收到 FileNotFoundException。

我的配置:

        sparkSession.conf().set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 2);
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.name", "partitioned");
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.magic.enabled ", false);
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append");
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.staging.unique-filenames", true);
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads", true);
        sparkSession.conf().set("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory");
        sparkSession.conf().set("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol");
        sparkSession.conf().set("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter");
        sparkSession.conf().set("spark.hadoop.fs.s3a.committer.staging.tmp.path", "tmp/staging");

我做错了什么?有人可以帮忙吗?

注意:我已经在 Spark 中创建了一个 JIRA,但到目前为止没有任何帮助:SPARK-31072

================================================ ===============

我尝试了 (@Rajadayalan) 的答案。但它仍在使用 FileOutputFormatter。我尝试将 spark 版本降级到 2.4.5,但没有任何运气。

20/04/06 12:44:52 INFO ParquetFileFormat: Using user defined output committer for Parquet: org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
20/04/06 12:44:52 WARN AbstractS3ACommitterFactory: **Using standard FileOutputCommitter to commit work**. This is slow and potentially unsafe.
20/04/06 12:44:52 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
20/04/06 12:44:52 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/04/06 12:44:52 INFO AbstractS3ACommitterFactory: Using Commmitter FileOutputCommitter{PathOutputCommitter{context=TaskAttemptContextImpl{JobContextImpl{jobId=job_20200406124452_0000}; taskId=attempt_20200406124452_0000_m_000000_0, status=''}; org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter@61deb03f}; outputPath=s3a://******/observation, workPath=s3a://******/observation/_temporary/0/_temporary/attempt_20200406124452_0000_m_000000_0, algorithmVersion=2, skipCleanup=false, ignoreCleanupFailures=false} for s3a://********/observation
20/04/06 12:44:53 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
20/04/06 12:44:54 INFO CodeGenerator: Code generated in 81.077046 ms
20/04/06 12:44:54 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
20/04/06 12:44:54 INFO CodeGenerator: Code generated in 31.993775 ms
20/04/06 12:44:54 INFO CodeGenerator: Code generated in 9.967359 ms

注意:我的本地没有安装 Spark。所以给了 spark-hadoop-cloud_2.11 作为编译时依赖 我的 build.gradle 如下所示:

    compile group: 'org.apache.spark', name: 'spark-hadoop-cloud_2.11', version: '2.4.2.3.1.3.0-79'
    compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.0'
    // https://mvnrepository.com/artifact/org.apache.parquet/parquet-column
    compile group: 'org.apache.parquet', name: 'parquet-column', version: '1.10.1'
    // https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop
    compile group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.10.1'
    compile group: 'org.apache.parquet', name: 'parquet-avro', version: '1.10.1'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-sketch
    compile group: 'org.apache.spark', name: 'spark-sketch_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-core
    compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst
    compile group: 'org.apache.spark', name: 'spark-catalyst_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-tags
    compile group: 'org.apache.spark', name: 'spark-tags_2.11', version: '2.4.5'
    compile group: 'org.apache.spark', name: 'spark-avro_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.spark/spark-hive
    compile group: 'org.apache.spark', name: 'spark-hive_2.11', version: '2.4.5'
    // https://mvnrepository.com/artifact/org.apache.xbean/xbean-asm6-shaded
    compile group: 'org.apache.xbean', name: 'xbean-asm7-shaded', version: '4.15'
   compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.2.1'
//    compile group: 'org.apache.hadoop', name: 'hadoop-s3guard', version: '3.2.1'
    compile group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.2.1'
    compile group: 'org.apache.hadoop', name: 'hadoop-client', version: '3.2.1'
    compile group: 'com.amazonaws', name: 'aws-java-sdk-bundle', version: '1.11.271'

【问题讨论】:

    标签: apache-spark hadoop amazon-s3 parquet


    【解决方案1】:

    有同样的问题,解决方案来自 How To Get Local Spark on AWS to Write to S3 加载了 PartitionedStagingCommitter。您还必须从解决方案中提到的下载 spark-hadoop-cloud jar。

    我也使用 spark 3.0 并且这个版本的 jar 工作 https://repo.hortonworks.com/content/repositories/releases/org/apache/spark/spark-hadoop-cloud_2.11/2.4.2.3.1.3.0-79/

    我的 spark-defaults.conf 中的设置

    spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
    spark.hadoop.fs.s3a.committer.name                           partitioned
    spark.hadoop.fs.s3a.committer.magic.enabled                  false
    spark.hadoop.fs.s3a.commiter.staging.conflict-mode           append
    spark.hadoop.fs.s3a.committer.staging.unique-filenames       true
    spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads  true
    spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a    
    org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
    spark.sql.sources.commitProtocolClass                        
    org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
    spark.sql.parquet.output.committer.class                     
    org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
    

    【讨论】:

    • 谢谢。但是作者仍然以某种方式使用 FileOutputCommitter。有关详细信息,请查看我更新的问题。
    • 通过一个小的修改,我能够得到这个工作。谢谢
    【解决方案2】:

    与@Rajadayalan 的建议相比,我做了一点小小的改动。除了 sparkSession.config().set() 在最初的问题中,我在编写镶木地板文件时在 df 中添加了 option() 参数

     df.distinct()
                   .withColumn("date", date_format(col(EFFECTIVE_PERIOD_START), "yyyy-MM-dd"))
                   .repartition(col("date"))
                   .write()
                   .format(fileFormat)
                   .partitionBy("date")
                   .mode(SaveMode.Append)
                   .option("fs.s3a.committer.name", "partitioned")
                   .option("fs.s3a.committer.staging.conflict-mode", "append")
                   .option("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
                   .option("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
                   .option("compression", compressionCodecName.name().toLowerCase())
                   .save(DOWNLOADS_NON_COMPACT_PATH);
    

    这有所不同,以下堆栈跟踪描述了它使用 PartitionedStagingCommitter

    我还可以看到 _SUCCESS 文件是 JSON,而不是 S3 中的空触摸文件 (_SUCCESS)。

    20/04/06 14:27:26 INFO ParquetFileFormat: Using user defined output committer for Parquet: org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
    20/04/06 14:27:26 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
    20/04/06 14:27:26 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
    20/04/06 14:27:26 INFO AbstractS3ACommitterFactory: Using committer partitioned to output data to s3a://************/observation
    20/04/06 14:27:26 INFO AbstractS3ACommitterFactory: Using Commmitter PartitionedStagingCommitter{StagingCommitter{AbstractS3ACommitter{role=Task committer attempt_20200406142726_0000_m_000000_0, name=partitioned, outputPath=s3a://*********/observation, workPath=file:/tmp/hadoop-**********/s3a/local-1586197641397/_temporary/0/_temporary/attempt_20200406142726_0000_m_000000_0}, conflictResolution=APPEND, wrappedCommitter=FileOutputCommitter{PathOutputCommitter{context=TaskAttemptContextImpl{JobContextImpl{jobId=job_20200406142726_0000}; taskId=attempt_20200406142726_0000_m_000000_0, status=''}; org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter@4494e88a}; outputPath=file:/Users/**********/Downloads/SparkParquetSample/tmp/staging/**********/local-1586197641397/staging-uploads, workPath=null, algorithmVersion=1, skipCleanup=false, ignoreCleanupFailures=false}}} for s3a://parquet-uuid-test/device-metric-observation6
    20/04/06 14:27:27 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
    20/04/06 14:27:27 INFO CodeGenerator: Code generated in 52.744811 ms
    20/04/06 14:27:27 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
    20/04/06 14:27:27 INFO CodeGenerator: Code generated in 48.78277 ms
    

    【讨论】:

      猜你喜欢
      • 2022-01-02
      • 2018-09-06
      • 2021-03-07
      • 1970-01-01
      • 1970-01-01
      • 2019-03-02
      • 2021-12-31
      • 1970-01-01
      • 2017-02-11
      相关资源
      最近更新 更多