【问题标题】:How to expand nested JSON into Spark dataframe on AWS glue如何在 AWS 胶水上将嵌套的 JSON 扩展为 Spark 数据框
【发布时间】:2023-03-19 20:04:02
【问题描述】:

使用以下营销 JSON 文件

{
    "request_id": "xx",
    "timeseries_stats": [
        {
            "timeseries_stat": {
                "id": "xx",
                "timeseries": [
                    {
                        "start_time": "xx",
                        "end_time": "xx",
                        "stats": {
                            "impressions": xx,
                            "swipes": xx,
                            "view_completion": xx,
                            "spend": xx
                        }
                    },
                    {
                        "start_time": "xx",
                        "end_time": "xx",
                        "stats": {
                            "impressions": xx,
                            "swipes": xx,
                            "view_completion": xx,
                            "spend": xx
                        }
                    }

我可以很容易地使用 pandas 解析这个并获得所需格式的数据帧

start_time   end_time     impressions   swipes   view_completion    spend
    xx          xx             xx         xx            xx            xx
    xx          xx             xx         xx            xx            xx

但需要在 AWS Glue 上使用 spark。

使用创建初始 spark 数据帧 (df) 后

rdd = sc.parallelize(JSON_resp['timeseries_stats'][0]['timeseries_stat']['timeseries'])
df = rdd.toDF()

我尝试按如下方式扩展 stats

df_expanded = df.select("start_time","end_time","stats.*")

错误:

AnalysisException: 'Can only star expand struct data types. 
Attribute: `ArrayBuffer(stats)`;'

&

from pyspark.sql.functions import explode
df_expanded = df.select("start_time","end_time").withColumn("stats", explode(df.stats))

错误:

AnalysisException: 'The number of aliases supplied in the AS clause does not match the 
number of columns output by the UDTF expected 2 aliases but got stats ;

spark 很新,对于这两种方法中的任何一种,任何帮助都将不胜感激!

这是一个非常相似的问题:

parse array of dictionaries from JSON with Spark

除了我需要展平这个额外的统计键。

【问题讨论】:

    标签: json apache-spark pyspark apache-spark-sql aws-glue


    【解决方案1】:

    当你 explode 一个地图列时,它会给你两列,所以 .withColumn 不起作用。将explodeselect 语句一起使用。

    from pyspark.sql import functions as f
    
    df.select('start_time', 'end_time', f.explode('stats')) \
      .groupBy('start_time', 'end_time').pivot('key').agg(f.first('value')).show()
    
    +----------+--------+-----------+-----+------+---------------+
    |start_time|end_time|impressions|spend|swipes|view_completion|
    +----------+--------+-----------+-----+------+---------------+
    |        yy|      yy|         yy|   yy|    yy|             yy|
    |        xx|      xx|         xx|   xx|    xx|             xx|
    +----------+--------+-----------+-----+------+---------------+
    

    【讨论】:

    • 谢谢!我唯一要做的就是声明import pyspark.sql.functions as f 让它工作。
    猜你喜欢
    • 1970-01-01
    • 2020-03-11
    • 2021-04-27
    • 2020-03-19
    • 1970-01-01
    • 2023-03-03
    • 2020-04-04
    • 2020-09-27
    • 2022-06-21
    相关资源
    最近更新 更多