【问题标题】:How to reduce the time taken to write parquet files to s3 using AWS Glue如何减少使用 AWS Glue 将 parquet 文件写入 s3 所需的时间
【发布时间】:2023-04-07 00:33:01
【问题描述】:

我正在创建一个粘合作业,该作业需要处理来自 s3 路径 - s3://<path>/<year>/<month>/<day>/<hour>/ 的每日 4TB 数据量。因此,我创建了一个循环,按小时文件夹(每个 155Gb)将数据读取到 spark df 中,过滤某些类别并作为按过滤类别(s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/)分区的镶木地板文件写回 s3。我正在使用 60 个 G2.X 工作节点,每个节点都有(8 个 vCPU、32 GB 内存、128 GB 磁盘)。 S3 写入非常慢,需要 10 多个小时才能完成运行。除了增加节点数量之外,有没有办法加快/优化 s3 写入?


def s3_load_job(input_list):

    hour, year, month, day = input_list
    logger.info(f"hour in s3 func {hour}")
    
    # get data from s3
    s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
    logger.info(f"print s3 path {s3_path}")

    #user defined library function that return spark df
    df = get_df_from_s3(glueContext, s3_path)

    df = df.withColumn('category', F.lower(F.col('category')))

    df_rep = df.where(F.col('category').isin({ "A", "B", "C","D"}))

    #write to s3
    datasink4 = DynamicFrame.fromDF(df_rep, glueContext, "datasink4")
    
    glueContext.write_dynamic_frame.from_options(frame = datasink4,
                                                             connection_type = "s3",
                                                             connection_options = 
                                                             {"path":"s3://<path>/"
                                           ,"partitionKeys"["category","year","month","day","hour"]}
                                                             ,format = "glueparquet" )



def main():
    
    year = '2020'
    month = '08'
    day = '01'
    hours = ["%.2d" % i for i in range(24)]

    input_list = [[hour, year, month, day] for hour in hours]
    logger.info(f"input_list {input_list}")

    for i in input_list:
        s3_load_job(i)
    
    job.commit()



if __name__ == "__main__":
    main()            
       

【问题讨论】:

  • 目标文件的平均大小是多少?是否超过 256 MB 块??
  • 过滤后,每个类别写入的大小不同(倾斜),'A'约为600GB,'B'400GB,其余各为250G。写入的目标文件的大小是由胶水在内部计算的(优化为使用所有 cpu 内核),据我所知,每个文件的范围在 1-15MB 之间。在将动态帧写入 s3 时,我不使用任何合并/重新分区以避免内存错误。

标签: apache-spark amazon-s3 pyspark aws-glue


【解决方案1】:

如果您使用的是 S3(对象存储),请尝试设置以下配置:

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored -> true
mapreduce.fileoutputcommitter.algorithm.version -> 2

【讨论】:

  • 谢谢,能否请您解释一下上述代码对 s3 写入优化有何影响?
  • 对于一致性模型意味着基于重命名的提交是安全的对象存储(例如 S3),使用 v.2 算法来提高性能。与 v.1 算法相比,这在作业结束时进行的重命名更​​少。由于它仍然使用 rename() 来提交文件,因此当对象存储没有一致的元数据/列表时使用它是不安全的。提交者也可以设置为在清理临时文件时忽略失败;这降低了暂时性网络问题升级为作业失败的风险。因为存储临时文件会产生费用。
【解决方案2】:

你可以试试下面的

  1. 不要将 pyspark df 转换为 dynamicFrame,因为您可以将 pyspark 数据帧直接保存到 s3。
  2. 由于您获得的文件大小为每个 1MB 到 15MB,因此您需要进行优化。因此,请在将数据帧写入 s3 之前尝试重新分区。

如果您的分区大小为 250 GB,那么您应该创建至少 256 MB 大小的输出文件,或者在 G2.x 的情况下,您还可以创建每个大小为 512 MB 的文件。

你可以做到这一点

每个分区可以生成500个文件500*512 = 250 GB

df.repartition(500,partitionCol).write.partitionBy(partitionCol).parquet(path)

【讨论】:

  • 谢谢,尝试了这种方法,但作业执行时间实际上增加了。我相信时间的增加是由于工作人员在重新分区期间试图跨节点重新洗牌。
【解决方案3】:

看来您一定想出了解决办法。 想分享对我有用的东西。我每小时运行一次粘合作业,启用作业书签以不重新处理旧文件。确保您没有创建太多分区,这不仅会导致更长的执行时间,而且如果您想通过 Athena 进行查询,从长远来看,您的查询可能会超时。将分区保持在最低限度。通过重新分区,您的作业可能会花费太多时间来洗牌数据,这可能会增加作业运行时间。 然而,频繁的每小时运行可能会有所帮助。 请分享对您有用的方法。

【讨论】:

    猜你喜欢
    • 2019-07-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-03
    • 2020-02-14
    • 2019-06-07
    • 2017-10-29
    相关资源
    最近更新 更多