【问题标题】:AWS Glue Pyspark Transformation Filter API not workingAWS Glue Pyspark 转换过滤器 API 不起作用
【发布时间】:2020-08-10 17:48:10
【问题描述】:

我是 AWS Glue 和 Python 的新手。尝试为 filter3frame 中的 dynamicFrame datasource0 应用 Filer.apply 函数。作业运行失败,我发现日志中未定义 filter_sex 函数。确切的错误:“NameError:filter_sex 未定义”。谁能告诉我做错了什么?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "amssurvey", table_name = "amssurvey", transformation_ctx = "datasource0")


filter1frame = Filter.apply(frame=datasource0, f=lambda x:x['citizen'] in ["US"])

filter2frame = Filter.apply(frame=datasource0, f=lambda x:x['count'] > 50)

filter3frame = Filter.apply(frame=datasource0, f=filter_sex(datasource0))







filter1_op = glueContext.write_dynamic_frame.from_options(frame = filter1frame, connection_type = "s3", connection_options = {"path": "s3://asgqatestautomation3/SourceFiles/filter1_op"}, format = "csv", transformation_ctx = "filter1_op")
filter2_op = glueContext.write_dynamic_frame.from_options(frame = filter2frame, connection_type = "s3", connection_options = {"path": "s3://asgqatestautomation3/SourceFiles/filter2_op"}, format = "csv", transformation_ctx = "filter2_op")
filter3_op = glueContext.write_dynamic_frame.from_options(frame = filter3frame, connection_type = "s3", connection_options = {"path": "s3://asgqatestautomation3/SourceFiles/filter3_op"}, format = "csv", transformation_ctx = "filter3_op")
job.commit()



def filter_sex(item):
    if item['sex'] == 'Male':
        return True
    else:
        return False

【问题讨论】:

    标签: python-3.x amazon-web-services aws-lambda aws-glue


    【解决方案1】:

    而不是定义一个函数。你为什么不试试下面的代码

    filter3frame = Filter.apply(frame=datasource0, f=lambda x:x['sex'] > 'Male')
    

    关于编译错误: filter_sex 应该在使用前定义

    【讨论】:

    • 是的,但我想写一个单独的函数而不是 lambda 函数
    • 我试图在 import 语句之后立即定义 filter_sex 函数,但我仍然面临另一个错误““TypeError:”“DynamicFrame”对象不可下标”。你能帮忙吗?
    • 对不起@kdkarthik 我不是 Python 专家,但似乎 filter_sex 函数的输入参数和您提供的输入之间存在数据类型不匹配。我猜你的函数需要数据框 col
    【解决方案2】:

    我把它修好了。

    正如@QuickSilver 所说,每个函数都必须在使用之前定义。此外,动态框架必须如下所示。调用它的 filter_sex 函数不需要有参数。

    filter3frame = Filter.apply(frame=datasource0, f=filter_sex) 
    

    所以最终的工作代码如下——

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    def filter_sex(item):
        if item['sex'] == 'Male':
            return True
        else:
            return False
    
    
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [database = "amssurvey", table_name = "amssurvey", transformation_ctx = "datasource0"]
    ## @return: datasource0
    ## @inputs: []
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "amssurvey", table_name = "amssurvey", transformation_ctx = "datasource0")
    ## @type: ApplyMapping
    ## @args: [mapping = [("nomber", "long", "nomber", "long"), ("type", "string", "type", "string"), ("sex", "string", "sex", "string"), ("citizen", "string", "citizen", "string"), ("count", "long", "count", "long"), ("countstate", "long", "countstate", "long")], transformation_ctx = "applymapping1"]
    ## @return: applymapping1
    ## @inputs: [frame = datasource0]
    
    
    filter1frame = Filter.apply(frame=datasource0, f=lambda x:x['citizen'] in ["US"])
    
    filter2frame = Filter.apply(frame=datasource0, f=lambda x:x['count'] > 50)
    
    filter3frame = Filter.apply(frame=datasource0, f=filter_sex)
    
    
    
    
    
    
    
    filter1_op = glueContext.write_dynamic_frame.from_options(frame = filter1frame, connection_type = "s3", connection_options = {"path": "s3://asgqatestautomation3/SourceFiles/filter1_op"}, format = "csv", transformation_ctx = "filter1_op")
    filter2_op = glueContext.write_dynamic_frame.from_options(frame = filter2frame, connection_type = "s3", connection_options = {"path": "s3://asgqatestautomation3/SourceFiles/filter2_op"}, format = "csv", transformation_ctx = "filter2_op")
    filter3_op = glueContext.write_dynamic_frame.from_options(frame = filter3frame, connection_type = "s3", connection_options = {"path": "s3://asgqatestautomation3/SourceFiles/filter3_op"}, format = "csv", transformation_ctx = "filter3_op")
    job.commit()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-17
      • 1970-01-01
      • 2018-04-15
      • 1970-01-01
      • 2021-09-30
      • 2019-07-08
      相关资源
      最近更新 更多