【发布时间】:2019-03-19 17:31:19
【问题描述】:
我有 CSV 数据,这些数据是通过胶水爬虫爬取的,并最终出现在一个表中。
我正在尝试运行 ETL 作业以将磁盘上的数据重新分区到日期列的某些组件中。然后将 CSV 转换为 parquet。
即我的数据中有一个名为“日期”的列,并且想在 s3 上将数据分区为年、月、日分区。
我能够转换为镶木地板并使其在序列号值(不同的列)上正确分区,但它会将值“__HIVE_DEFAULT_PARTITION__”放入与日期相关的所有值年、月和日分区。
我可以在其他列(如序列号)上进行分区,但年/月/日不在原始数据集中,因此我的方法是将日期列中的值创建为新列在数据集中并告诉 write_dynamic_frame 函数按列分区,但这不起作用。
一般来说,我是 spark/pyspark 和胶水的新手,所以我很可能会遗漏一些简单的东西。
感谢任何提供帮助的人。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "date", "date", "date"), ("serial-number", "string", "serial-number", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
to_spark_df4 = dropnullfields3.toDF()
with_file_name_df5 = to_spark_df4.withColumn("input_file_name", F.input_file_name()).withColumn('year', F.year(F.col("date").cast("date"))).withColumn('month', F.month(F.col("date").cast("date"))).withColumn('day', F.dayofmonth(F.col("date").cast("date")))
back_to_glue_df8 = DynamicFrame.fromDF(with_file_name_df5, glueContext, "back_to_glue_df8")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = back_to_glue_df8, connection_type = "s3", connection_options = {"path": "s3://output/path","partitionKeys": ["serial-number","year", "month","day"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
结果是我在 s3 中的键最终看起来像这样:
serial-number=1234567890/year=__HIVE_DEFAULT_PARTITION__/month=__HIVE_DEFAULT_PARTITION__/day=__HIVE_DEFAULT_PARTITION__/part-01571-273027e4-72ba-45ff-ac15-c0bb2f342e58.c000.snappy.parquet
更新:编辑格式化
【问题讨论】:
-
如果 partitionKey 具有 NULL 值,则创建
__HIVE_DEFAULT_PARTITION__。date字段是否已填充?当你这样做时:.withColumn('year', F.year(F.col("date").cast("date")))我注意到你以前有一个ApplyMapping将字段和类型从date映射到 date, I wonder if you need to do that _after_ the dataframe is converted back to a glue dynamicframe? I look at thoseApplyMapping` 自动生成的步骤并阅读文档,除了重命名列或更改类型我想知道有什么意义?向 Glue 提供字段元数据可能是一种技巧。 -
我在 Athena 文档中读到分区类型必须是原始类型,它没有枚举,但我认为这意味着从
year、month、dayofmonth函数返回的整数应该可以。我想也许我已经读过类型需要是字符串,但这是一个与您在原生 pyspark 中非常接近的有效示例:stackoverflow.com/a/41739138/1335793 这是关于使用 ApplyMapping after 您新添加的列 @987654322 的评论@
标签: apache-spark amazon-s3 pyspark boto3 aws-glue