【发布时间】:2022-02-03 01:03:52
【问题描述】:
我正在使用 AWS Glue ETL 将数据从 PostgreSQL 迁移到 S3。我设置的作业读取文件正常,作业运行成功,有一个文件添加到正确的 S3 存储桶中。我遇到的问题是我无法重命名文件 - 它被赋予了一个随机名称,例如 part-0000-.csv。我是 Spark 和 Glue 的新手,不胜感激。
所以我尝试使用以下代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit
from datetime import date
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "default tbl", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("customer_type", "string", "customer_type", "string"), ("project_type", "string", "project_type", "string"), ("device_id", "string", "device_id", "string"),], transformation_ctx = "applymapping1")
today = date.today()
d3 = today.strftime("%Y%m%d")
print("d3 =", d3)
applymapping1=applymapping1.toDF()
repartitioned1 = applymapping1.repartition(1)
path="s3://{}/date/"+d3
repartitioned1.write.option("maxRecordsPerFile", 300).option('header', 'true').mode("overwrite").csv(path)
import boto3
client = boto3.client('s3')
s3_bucket='{}'
srcPrefix = '/date/20220128/'
new_name='TrackingReport-' + d3
#getting all the content/file inside the bucket.
print (s3_bucket)
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(s3_bucket)
for obj in my_bucket.objects.filter(Delimiter='/', Prefix='date/'):
print (obj.key)
response = client.list_objects(Bucket=s3_bucket,prefix=srcPrefix,)
names = response["Contents"]
print ("names")
#Find out the file which have part-000* in it's Key
particulars = [name['Key'] for name in names if 'part-000' in name['Key']]
print ("particulars")
#Find out the prefix of part-000*
location = [particular.split('part-000')[0] for particular in particulars]
print ("location")
#Constrain - copy_object
for key,particular in enumerate(particulars):
client.copy_object(Bucket=s3_bucket, CopySource=s3_bucket + "/" + particular, Key=location[key]+"newfile")
print ("for loop")
client.delete_object(Bucket=s3_bucket, Key=particular)
job.commit()
我得到的错误是 - 错误 : ParamValidationError:参数验证失败:
【问题讨论】: