【问题标题】:Spark Repartition IssueSpark 重新分区问题
【发布时间】:2022-08-03 16:49:19
【问题描述】:

今天是个好日子, 我正在处理一个项目,我在 Spark (2.4.4) 和 PySpark 的帮助下对数百万条数据记录运行 ETL 过程。

我们从 AWS 巨大的压缩 CSV 文件中的 S3 存储桶中获取,将它们转换为 Spark Dataframes,使用 repartition() 方法并将每个部分转换为 parquet 数据以减轻和加速该过程:

        for file in files:
        if not self.__exists_parquet_in_s3(self.config[\'aws.output.folder\'] + \'/\' + file, \'.parquet\'):
            # Run the parquet converter
            print(\'**** Processing %s ****\' % file)
            # TODO: number of repartition variable
            df = SparkUtils.get_df_from_s3(self.spark_session, file, self.config[\'aws.bucket\']).repartition(94)
            s3folderpath = \'s3a://\' + self.config[\'aws.bucket\'] + \\
                           \'/\' + self.config[\'aws.output.folder\'] + \\
                           \'/%s\' % file + \'/\'
            print(\'Writing down process\')
            df.write.format(\'parquet\').mode(\'append\').save(
                \'%s\' % s3folderpath)
            print(\'**** Saving %s completed ****\' % file)
            df.unpersist()
        else:
            print(\'Parquet files already exist!\')

因此,作为第一步,这段代码在 s3 存储桶中搜索这些 parquet 文件是否存在,如果不存在,它将进入 for 循环并运行所有转换。

现在,让我们进入正题。我有这个管道,它适用于每个 csv 文件,除了一个与其他文件相同的管道,除了在镶木地板中重新分区和转换后也更重(29 MB x 94 部分与 900 kB x 32 部分)。

这会在过程中的一段时间后导致瓶颈(分为相同的周期,其中周期数等于重新分区的数量),从而提高了java堆内存空间几个警告后的问题:

WARN TaskSetManager: Stage X contains a task of very large size (x KB). The maximum recommended size is 100 KB。 (另见下面的图片)

第1部分]:

第2部分

最合乎逻辑的解决方案是进一步增加 repartition 参数以降低每个 parquet 文件的权重,但它不允许我创建超过 94 个分区,在 for 循环中的一段时间后(如上所述)它会引发此错误:

ERROR FileFormatWriter: Aborting job 8fc9c89f-dccd-400c-af6f-dfb312df0c72.
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: HGC6JTRN5VT5ERRR, AWS Error Code: SignatureDoesNotMatch, AWS Error Message: The request signature we calculated does not match the signature you provided. Check your key and signing method., S3 Extended Request ID: 7VBu4mUEmiAWkjLoRPruTiCY3IhpK40t+lg77HDNC0lTtc8h2Zi1K1XGSnJhjwnbLagN/kS+TpQ=

或者还有:

第二种问题类型,注意警告

我注意到我可以对与原始值相关的文件进行分区:我可以使用 16 作为参数而不是 94,它会运行良好,但如果我将它增加到原始值 94 以上,它会赢\'不工作。

请记住,这条管道运行良好直到最后对于其他(较轻的)CSV 文件,这里唯一的变量似乎是输入文件(特别是大小),它似乎会在一段时间后停止。如果您需要任何其他详细信息,请告诉我,如果您能帮助我,我将非常高兴。提前谢谢大家。

    标签: python apache-spark pyspark apache-spark-sql etl


    【解决方案1】:

    根据您提供的代码和日志,不确定您的SparkUtils 中的逻辑是否与您的资源或分区无关,这可能是由您的 spark 应用程序和 S3 之间的连接引起的:

    com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403
    

    403:您的登录名无权访问您尝试读/写的存储桶/文件。虽然它来自关于身份验证的 Hadoop 文档,但您可以看到有几种情况会导致此错误:https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/troubleshooting_s3a.html#Authentication_Failure。正如您提到的,您在循环期间看到此错误,但在工作开始时没有,请检查您的 spark 作业的运行时间,以及 IAM 和会话身份验证,因为它可能是由会话到期(默认 1 小时)引起的,详细信息您可以查看:https://docs.aws.amazon.com/singlesignon/latest/userguide/howtosessionduration.html

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-03-02
      • 2019-08-26
      • 2015-10-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多