【问题标题】:__HIVE_DEFAULT_PARTITION__ as partition value in glue ETL job__HIVE_DEFAULT_PARTITION__ 作为胶水 ETL 作业中的分区值
【发布时间】:2019-03-19 17:31:19
【问题描述】:

我有 CSV 数据,这些数据是通过胶水爬虫爬取的,并最终出现在一个表中。

我正在尝试运行 ETL 作业以将磁盘上的数据重新分区到日期列的某些组件中。然后将 CSV 转换为 parquet。

即我的数据中有一个名为“日期”的列,并且想在 s3 上将数据分区为年、月、日分区。

我能够转换为镶木地板并使其在序列号值(不同的列)上正确分区,但它会将值“__HIVE_DEFAULT_PARTITION__”放入与日期相关的所有值年、月和日分区。

我可以在其他列(如序列号)上进行分区,但年/月/日不在原始数据集中,因此我的方法是将日期列中的值创建为新列在数据集中并告诉 write_dynamic_frame 函数按列分区,但这不起作用。

一般来说,我是 spark/pyspark 和胶水的新手,所以我很可能会遗漏一些简单的东西。

感谢任何提供帮助的人。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "date", "date", "date"), ("serial-number", "string", "serial-number", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

to_spark_df4 = dropnullfields3.toDF()

with_file_name_df5 = to_spark_df4.withColumn("input_file_name", F.input_file_name()).withColumn('year', F.year(F.col("date").cast("date"))).withColumn('month', F.month(F.col("date").cast("date"))).withColumn('day', F.dayofmonth(F.col("date").cast("date")))

back_to_glue_df8 = DynamicFrame.fromDF(with_file_name_df5, glueContext, "back_to_glue_df8")


datasink4 = glueContext.write_dynamic_frame.from_options(frame = back_to_glue_df8, connection_type = "s3", connection_options = {"path": "s3://output/path","partitionKeys": ["serial-number","year", "month","day"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

结果是我在 s3 中的键最终看起来像这样:

serial-number=1234567890/year=__HIVE_DEFAULT_PARTITION__/month=__HIVE_DEFAULT_PARTITION__/day=__HIVE_DEFAULT_PARTITION__/part-01571-273027e4-72ba-45ff-ac15-c0bb2f342e58.c000.snappy.parquet

更新:编辑格式化

【问题讨论】:

  • 如果 partitionKey 具有 NULL 值,则创建 __HIVE_DEFAULT_PARTITION__date 字段是否已填充?当你这样做时:.withColumn('year', F.year(F.col("date").cast("date"))) 我注意到你以前有一个ApplyMapping 将字段和类型从date 映射到 date, I wonder if you need to do that _after_ the dataframe is converted back to a glue dynamicframe? I look at those ApplyMapping` 自动生成的步骤并阅读文档,除了重命名列或更改类型我想知道有什么意义?向 Glue 提供字段元数据可能是一种技巧。
  • 我在 Athena 文档中读到分区类型必须是原始类型,它没有枚举,但我认为这意味着从 yearmonthdayofmonth 函数返回的整数应该可以。我想也许我已经读过类型需要是字符串,但这是一个与您在原生 pyspark 中非常接近的有效示例:stackoverflow.com/a/41739138/1335793 这是关于使用 ApplyMapping after 您新添加的列 @987654322 的评论@

标签: apache-spark amazon-s3 pyspark boto3 aws-glue


【解决方案1】:

我的工作与你的非常相似。我希望你现在设法解决它,但无论如何,这是解决你困境的方法:

基本解决方案:

from pyspark.sql.functions import year, month, dayofmonth

###### rest of your code until ApplyMapping included ######

# add year, month & day columns, non zero-padded
df = df.toDF()
df = df.withColumn('year', year(df.date))\
       .withColumn('month', month(df.date))\
       .withColumn('day', dayofmonth(df.date))

补充说明:

如果您需要在要选择日期范围的 Athena 上运行查询,我建议您避免使用嵌套分区(如年 -> 月 -> 日),而是使用平面分区架构。这样做的原因是查询变得更容易编写。这是获取平面模式的python代码:

from pyspark.sql.functions import date_format

###### rest of your code until ApplyMapping included ######

df = df.toDF()
df = df.withColumn('date_2', date_format(df.date, 'yyyy-MM-dd'))

# date_2 is because column "date" already exists,
# but we want the partitioning one to be in a string format.
# You can later drop the original column if you wish.

假设您现在要查询从 2020 年 3 月 15 日到 2020 年 4 月 3 日的数据。

这是基于您选择的分区架构的 SQL 查询。

嵌套架构

SELECT item_1, item_2
  FROM my_table
 WHERE year = 2020
   AND (
          (month = 3 AND day >= 15)
       OR (month = 4 AND day <= 3)
       )

平面架构

SELECT item_1, item_2
  FROM my_table
 WHERE date BETWEEN '2020-03-15' AND '2020-04-3'

此外,鉴于您的“日期”列存储为字符串,您将能够使用 LIKE 运算符运行查询。

例如,如果您想查询数据库中每年四月的所有数据,您可以这样做:

SELECT item_1, item_2
  FROM my_table
 WHERE date LIKE '%-04-%'

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-12
    • 2019-08-30
    • 1970-01-01
    • 1970-01-01
    • 2022-08-09
    相关资源
    最近更新 更多