【发布时间】:2018-05-11 08:19:41
【问题描述】:
我有以下适用于 AWS Glue 的简单脚本。我有一个包含空单元格的文本文件和一个接受 NULL 值的表。当我运行粘合作业时,它失败并出现异常“不知道如何将 NullType 保存到 REDSHIFT”。
我该如何处理这个问题,或者 RedShift via Glue 不支持 NULL 插入?
工作脚本:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "poc_edw", table_name = "delta_orderheader", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "poc_edw", table_name = "delta_orderheader", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("partitionnumber", "int", "partitionnumber", "int"), ("messagekey", "long", "messagekey", "long"), ("applicationversion", "string", "applicationversion", "string"), ("businessdate", "date", "businessdate", "date"), ("change", "decimal(10,2)", "change", "decimal(10,2)"), ("employeeid", "int", "employeeid", "int"), ("employeename", "string", "employeename", "string"), ("employeeuserid", "string", "employeeuserid", "string"), ("meallocation", "int", "meallocation", "int"), ("messageid", "string", "messageid", "string"), ("ordernumber", "int", "ordernumber", "int"), ("ordersourcetypekey", "short", "ordersourcetypekey", "short"), ("posid", "int", "posid", "int"), ("satellitenumber", "int", "satellitenumber", "int"), ("spmhostordercode", "string", "spmhostordercode", "string"), ("storenumber", "int", "storenumber", "int"), ("taxamount", "decimal(10,2)", "taxamount", "decimal(10,2)"), ("taxexempt", "int", "taxexempt", "int"), ("taxinclusiveamount", "decimal(10,2)", "taxinclusiveamount", "decimal(10,2)"), ("terminalnumber", "string", "terminalnumber", "string"), ("transactiondate", "timestamp", "transactiondate", "timestamp"), ("transactionid", "int", "transactionid", "int"), ("version", "decimal(10,2)", "version", "decimal(10,2)"), ("woddescription", "string", "woddescription", "string"), ("wodpromotionid", "int", "wodpromotionid", "int"), ("wodtype", "short", "wodtype", "short"), ("wodvalue", "decimal(10,2)", "wodvalue", "decimal(10,2)"), ("sqlinsertedprocessid", "int", "sqlinsertedprocessid", "int"), ("insertedprocessid", "int", "insertedprocessid", "int"), ("lastupdatedprocessid", "int", "lastupdatedprocessid", "int"), ("createddatetime", "timestamp", "createddatetime", "timestamp"), ("lastupdateddatetime", "timestamp", "lastupdateddatetime", "timestamp"), ("applyprocessid", "int", "applyprocessid", "int"), ("applydatetime", "timestamp", "applydatetime", "timestamp"), ("ordernetamount", "decimal(10,2)", "ordernetamount", "decimal(10,2)"), ("loyaltysubcardid", "string", "loyaltysubcardid", "string"), ("loyaltymemberid", "string", "loyaltymemberid", "string"), ("basepointegersearned", "int", "basepointegersearned", "int"), ("bonuspointegersearned", "int", "bonuspointegersearned", "int"), ("loyaltynetsales", "decimal(10,2)", "loyaltynetsales", "decimal(10,2)"), ("rewardsredeemedamount", "decimal(10,2)", "rewardsredeemedamount", "decimal(10,2)"), ("rewardsabandonedamount", "decimal(10,2)", "rewardsabandonedamount", "decimal(10,2)"), ("loyaltymemberlookuptypekey", "short", "loyaltymemberlookuptypekey", "short"), ("remoteorderid", "string", "remoteorderid", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("partitionnumber", "int", "partitionnumber", "int"), ("messagekey", "long", "messagekey", "long"), ("applicationversion", "string", "applicationversion", "string"), ("businessdate", "date", "businessdate", "date"), ("change", "decimal(10,2)", "change", "decimal(10,2)"), ("employeeid", "int", "employeeid", "int"), ("employeename", "string", "employeename", "string"), ("employeeuserid", "string", "employeeuserid", "string"), ("meallocation", "int", "meallocation", "int"), ("messageid", "string", "messageid", "string"), ("ordernumber", "int", "ordernumber", "int"), ("ordersourcetypekey", "short", "ordersourcetypekey", "short"), ("posid", "int", "posid", "int"), ("satellitenumber", "int", "satellitenumber", "int"), ("spmhostordercode", "string", "spmhostordercode", "string"), ("storenumber", "int", "storenumber", "int"), ("taxamount", "decimal(10,2)", "taxamount", "decimal(10,2)"), ("taxexempt", "int", "taxexempt", "int"), ("taxinclusiveamount", "decimal(10,2)", "taxinclusiveamount", "decimal(10,2)"), ("terminalnumber", "string", "terminalnumber", "string"), ("transactiondate", "timestamp", "transactiondate", "timestamp"), ("transactionid", "int", "transactionid", "int"), ("version", "decimal(10,2)", "version", "decimal(10,2)"), ("woddescription", "string", "woddescription", "string"), ("wodpromotionid", "int", "wodpromotionid", "int"), ("wodtype", "short", "wodtype", "short"), ("wodvalue", "decimal(10,2)", "wodvalue", "decimal(10,2)"), ("sqlinsertedprocessid", "int", "sqlinsertedprocessid", "int"), ("insertedprocessid", "int", "insertedprocessid", "int"), ("lastupdatedprocessid", "int", "lastupdatedprocessid", "int"), ("createddatetime", "timestamp", "createddatetime", "timestamp"), ("lastupdateddatetime", "timestamp", "lastupdateddatetime", "timestamp"), ("applyprocessid", "int", "applyprocessid", "int"), ("applydatetime", "timestamp", "applydatetime", "timestamp"), ("ordernetamount", "decimal(10,2)", "ordernetamount", "decimal(10,2)"), ("loyaltysubcardid", "string", "loyaltysubcardid", "string"), ("loyaltymemberid", "string", "loyaltymemberid", "string"), ("basepointegersearned", "int", "basepointegersearned", "int"), ("bonuspointegersearned", "int", "bonuspointegersearned", "int"), ("loyaltynetsales", "decimal(10,2)", "loyaltynetsales", "decimal(10,2)"), ("rewardsredeemedamount", "decimal(10,2)", "rewardsredeemedamount", "decimal(10,2)"), ("rewardsabandonedamount", "decimal(10,2)", "rewardsabandonedamount", "decimal(10,2)"), ("loyaltymemberlookuptypekey", "short", "loyaltymemberlookuptypekey", "short"), ("remoteorderid", "string", "remoteorderid", "string")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["applydatetime", "messagekey", "businessdate", "transactiondate", "sqlinsertedprocessid", "ordernetamount", "applicationversion", "messageid", "storenumber", "satellitenumber", "loyaltynetsales", "spmhostordercode", "bonuspointegersearned", "employeeid", "transactionid", "loyaltysubcardid", "employeeuserid", "taxinclusiveamount", "meallocation", "ordernumber", "loyaltymemberlookuptypekey", "applyprocessid", "ordersourcetypekey", "basepointegersearned", "partitionnumber", "insertedprocessid", "wodtype", "loyaltymemberid", "rewardsredeemedamount", "change", "rewardsabandonedamount", "version", "taxexempt", "remoteorderid", "wodpromotionid", "posid", "woddescription", "wodvalue", "lastupdatedprocessid", "taxamount", "terminalnumber", "lastupdateddatetime", "createddatetime", "employeename"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["applydatetime", "messagekey", "businessdate", "transactiondate", "sqlinsertedprocessid", "ordernetamount", "applicationversion", "messageid", "storenumber", "satellitenumber", "loyaltynetsales", "spmhostordercode", "bonuspointegersearned", "employeeid", "transactionid", "loyaltysubcardid", "employeeuserid", "taxinclusiveamount", "meallocation", "ordernumber", "loyaltymemberlookuptypekey", "applyprocessid", "ordersourcetypekey", "basepointegersearned", "partitionnumber", "insertedprocessid", "wodtype", "loyaltymemberid", "rewardsredeemedamount", "change", "rewardsabandonedamount", "version", "taxexempt", "remoteorderid", "wodpromotionid", "posid", "woddescription", "wodvalue", "lastupdatedprocessid", "taxamount", "terminalnumber", "lastupdateddatetime", "createddatetime", "employeename"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
更新:
我已经取得了一些进展。我认为问题是空字符(0x00),但事实并非如此。我重新制作了没有任何 NULL 字符的文件,但我遇到了同样的问题。
我添加了这行代码。
df = DropNullFields.apply(frame = resolvechoice4, transformation_ctx = "df")
我不完全理解为什么,但我能收集到的最好结果是 DynamicFrame 推断出一些不存在的 NullType 字段。添加这行代码后,我插入了行,但似乎没有包含我的字符串字段。我的字段中只有大约 1/2 有值。
【问题讨论】:
-
我似乎无法弄清楚如何进行值的内联替换。例如,下面的链接引用了 DataFrame 类的 na.fill() 和 fillna() 函数。 Glue 使用了 DynamicFrame,它是 DataFrame 的抽象,显然没有实现 .fillna() 或其别名。如果 DynamicFrame 类的 DropNullFields() 函数具有 NULL 值,则它似乎会删除整个字段,而不是仅省略字段中的 NULL 字符。 spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html
-
我们暂时放弃使用 AWS Glue。
-
我正在使用 AWS Glue 来管理我的 ETL 负载。我正在做的是:每次我需要进行一些转换、添加列或进行一些计算时,我都会使用以下方法将动态帧转换为 spark 数据帧:dataframe.toDF()
标签: python-3.x amazon-redshift etl aws-glue