【发布时间】:2023-04-07 00:33:01
【问题描述】:
我正在创建一个粘合作业,该作业需要处理来自 s3 路径 - s3://<path>/<year>/<month>/<day>/<hour>/ 的每日 4TB 数据量。因此,我创建了一个循环,按小时文件夹(每个 155Gb)将数据读取到 spark df 中,过滤某些类别并作为按过滤类别(s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/)分区的镶木地板文件写回 s3。我正在使用 60 个 G2.X 工作节点,每个节点都有(8 个 vCPU、32 GB 内存、128 GB 磁盘)。 S3 写入非常慢,需要 10 多个小时才能完成运行。除了增加节点数量之外,有没有办法加快/优化 s3 写入?
def s3_load_job(input_list):
hour, year, month, day = input_list
logger.info(f"hour in s3 func {hour}")
# get data from s3
s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
logger.info(f"print s3 path {s3_path}")
#user defined library function that return spark df
df = get_df_from_s3(glueContext, s3_path)
df = df.withColumn('category', F.lower(F.col('category')))
df_rep = df.where(F.col('category').isin({ "A", "B", "C","D"}))
#write to s3
datasink4 = DynamicFrame.fromDF(df_rep, glueContext, "datasink4")
glueContext.write_dynamic_frame.from_options(frame = datasink4,
connection_type = "s3",
connection_options =
{"path":"s3://<path>/"
,"partitionKeys"["category","year","month","day","hour"]}
,format = "glueparquet" )
def main():
year = '2020'
month = '08'
day = '01'
hours = ["%.2d" % i for i in range(24)]
input_list = [[hour, year, month, day] for hour in hours]
logger.info(f"input_list {input_list}")
for i in input_list:
s3_load_job(i)
job.commit()
if __name__ == "__main__":
main()
【问题讨论】:
-
目标文件的平均大小是多少?是否超过 256 MB 块??
-
过滤后,每个类别写入的大小不同(倾斜),'A'约为600GB,'B'400GB,其余各为250G。写入的目标文件的大小是由胶水在内部计算的(优化为使用所有 cpu 内核),据我所知,每个文件的范围在 1-15MB 之间。在将动态帧写入 s3 时,我不使用任何合并/重新分区以避免内存错误。
标签: apache-spark amazon-s3 pyspark aws-glue