【问题标题】:AWS Glue - Convert the Json response from GET(REST API) request to DataFrame/DyanamicFramce and store it in s3 bucketAWS Glue - 将 Json 响应从 GET(REST API) 请求转换为 DataFrame/DynamicFramce 并将其存储在 s3 存储桶中
【发布时间】:2021-07-27 20:17:53
【问题描述】:
headersAPI = {
     'Content-Type': 'application/json'
      , 'accept': 'application/json'
       ,'Authorization': 'Bearer XXXXXXXXXXXXXXXXXXXXXXXXXX',
}
skill_response=requests.get("XXXXXX",headers=headersAPI),headers=headersAPI)

log.info(skill_response.text)
skill_json=skill_response.json()
print(skill_json)  ##print the json data and verified
    
log.info('skills data')
log.info(skill_json["status"]) 
        
DataSink0 = glueContext.write_dynamic_frame.from_options(frame =
   skill_json, connection_type = "s3", format = "csv", connection_options=
   {"path": "s3://xxxxx/", "partitionKeys": []}, transformation_ctx= "DataSink0")

job.commit()

TypeError:frame_or_dfc 必须是 DynamicFrame 或 DynamicFrameCollection。得到

写入 S3 时出现此错误:'dict' object has no attribute '_jdf'

【问题讨论】:

    标签: python amazon-s3 aws-glue aws-glue-data-catalog aws-glue-spark


    【解决方案1】:

    将 JSON 响应转换为 DynamicFrame 可以通过首先从响应字符串创建一个 DataFrame(已讨论 here)然后将此 DataFrame 转换为 DynamicFrame。

    这个例子应该可以工作:

    import requests
    from awsglue.job import Job
    from pyspark.context import SparkContext
    
    from awsglue import DynamicFrame
    from awsglue.context import GlueContext
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    
    r = requests.get(url='https://api.github.com/users?since=100')
    
    df = spark.read.json(sc.parallelize([r.text]))
    
    dynamic_frame = DynamicFrame.fromDF(
        df, glue_ctx=glueContext, name="df"
    )
    
    #dynamic_frame.show()
    
    DataSink0 = glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,
        connection_type="s3", format="csv",
        connection_options={"path": "s3://xxxxx/",
                            "partitionKeys": []},
        transformation_ctx="DataSink0")
    job.commit()
    

    【讨论】:

    • 谢谢约翰内斯。我可以在 s3 存储桶中看到该文件。文件名看起来很奇怪 run-DataSink0-6-part-r-00000 没有 .json 扩展名。在记事本中查看时,数据看起来不错。可以把文件名改成skill_data.json吗?
    • 不幸的是我们不能。这就是 Spark 命名文件的方式。
    猜你喜欢
    • 2022-11-23
    • 2020-12-28
    • 1970-01-01
    • 1970-01-01
    • 2020-12-27
    • 2019-11-25
    • 1970-01-01
    • 2017-12-16
    • 2020-10-01
    相关资源
    最近更新 更多