【发布时间】:2022-08-03 16:49:19
【问题描述】:
今天是个好日子,
我正在处理一个项目,我在 Spark (2.4.4) 和 PySpark 的帮助下对数百万条数据记录运行 ETL 过程。
我们从 AWS 巨大的压缩 CSV 文件中的 S3 存储桶中获取,将它们转换为 Spark Dataframes,使用 repartition() 方法并将每个部分转换为 parquet 数据以减轻和加速该过程:
for file in files:
if not self.__exists_parquet_in_s3(self.config[\'aws.output.folder\'] + \'/\' + file, \'.parquet\'):
# Run the parquet converter
print(\'**** Processing %s ****\' % file)
# TODO: number of repartition variable
df = SparkUtils.get_df_from_s3(self.spark_session, file, self.config[\'aws.bucket\']).repartition(94)
s3folderpath = \'s3a://\' + self.config[\'aws.bucket\'] + \\
\'/\' + self.config[\'aws.output.folder\'] + \\
\'/%s\' % file + \'/\'
print(\'Writing down process\')
df.write.format(\'parquet\').mode(\'append\').save(
\'%s\' % s3folderpath)
print(\'**** Saving %s completed ****\' % file)
df.unpersist()
else:
print(\'Parquet files already exist!\')
因此,作为第一步,这段代码在 s3 存储桶中搜索这些 parquet 文件是否存在,如果不存在,它将进入 for 循环并运行所有转换。
现在,让我们进入正题。我有这个管道,它适用于每个 csv 文件,除了一个与其他文件相同的管道,除了在镶木地板中重新分区和转换后也更重(29 MB x 94 部分与 900 kB x 32 部分)。
这会在过程中的一段时间后导致瓶颈(分为相同的周期,其中周期数等于重新分区的数量),从而提高了java堆内存空间几个警告后的问题:
WARN TaskSetManager: Stage X contains a task of very large size (x KB). The maximum recommended size is 100 KB。 (另见下面的图片)
第1部分]:
第2部分
最合乎逻辑的解决方案是进一步增加 repartition 参数以降低每个 parquet 文件的权重,但它不允许我创建超过 94 个分区,在 for 循环中的一段时间后(如上所述)它会引发此错误:
ERROR FileFormatWriter: Aborting job 8fc9c89f-dccd-400c-af6f-dfb312df0c72.
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: HGC6JTRN5VT5ERRR, AWS Error Code: SignatureDoesNotMatch, AWS Error Message: The request signature we calculated does not match the signature you provided. Check your key and signing method., S3 Extended Request ID: 7VBu4mUEmiAWkjLoRPruTiCY3IhpK40t+lg77HDNC0lTtc8h2Zi1K1XGSnJhjwnbLagN/kS+TpQ=
或者还有:
第二种问题类型,注意警告
我注意到我可以对与原始值相关的文件进行分区:我可以使用 16 作为参数而不是 94,它会运行良好,但如果我将它增加到原始值 94 以上,它会赢\'不工作。
请记住,这条管道运行良好直到最后对于其他(较轻的)CSV 文件,这里唯一的变量似乎是输入文件(特别是大小),它似乎会在一段时间后停止。如果您需要任何其他详细信息,请告诉我,如果您能帮助我,我将非常高兴。提前谢谢大家。
标签: python apache-spark pyspark apache-spark-sql etl