【问题标题】:Pyspark explode json stringPyspark 爆炸 json 字符串
【发布时间】:2020-12-10 14:22:35
【问题描述】:

Input_dataframe

id  name     collection
111 aaaaa    {"1":{"city":"city_1","state":"state_1","country":"country_1"},
              "2":{"city":"city_2","state":"state_2","country":"country_2"},
              "3":{"city":"city_3","state":"state_3","country":"country_3"}
             }
222 bbbbb    {"1":{"city":"city_1","state":"state_1","country":"country_1"},
              "2":{"city":"city_2","state":"state_2","country":"country_2"},
              "3":{"city":"city_3","state":"state_3","country":"country_3"}
              }

这里

id ==> string
name ==> string
collection ==> string (string representation of JSON_data)

我想要这样的东西

输出数据帧

id  name   key  value
111 aaaaa  "1"  {"city":"city_1","state":"state_1","country":"country_1"},
111 aaaaa  "2"  {"city":"city_2","state":"state_2","country":"country_2"},
111 aaaaa  "3"  {"city":"city_3","state":"state_3","country":"country_3"}             
222 bbbbb  "1"  {"city":"city_1","state":"state_1","country":"country_1"},
222 bbbbb  "2"  {"city":"city_2","state":"state_2","country":"country_2"},
222 bbbbb  "3"  {"city":"city_3","state":"state_3","country":"country_3"}

如果我的collection 属性类型是maparray,那么explode 函数将完成我的任务。但我有collection 作为字符串类型(JSON_data)

如何获取 output_dataframe?

请告诉我

注意 集合属性可能具有嵌套且不可预测的架构。

{
  "1":{"city":"city_1","state":"state_1","country":"country_1"},          
  "2":{"city":"city_2","state":"state_2","country":"country_2","a":  
       {"aa":"111"}},
  "3":{"city":"city_3","state":"state_3"}
             }

【问题讨论】:

    标签: python-3.x dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    你有这个函数from_json 可以完成这项工作。它会转换你的字符串,然后你可以使用explode。

    【讨论】:

    • 嗨史蒂文,from_json 将帮助我们将 json_type 转换为 Map_type,我明白了。但这是问题所在,我有嵌套的 json 数据和模式是不可预测的。我只知道键是字符串,值是嵌套的json,还有一个嵌套的json。
    【解决方案2】:

    给出 json 架构并获取列的值,然后我从 json 中创建 struct 列。

    import pyspark.sql.functions as f
    from pyspark.sql.types import *
    
    schema = StructType([
        StructField('1', StructType([
            StructField('city', StringType(), True),
            StructField('state', StringType(), True),
            StructField('country', StringType(), True),
        ]), True),
        StructField('2', StructType([
            StructField('city', StringType(), True),
            StructField('state', StringType(), True),
            StructField('country', StringType(), True),
        ]), True),
        StructField('3', StructType([
            StructField('city', StringType(), True),
            StructField('state', StringType(), True),
            StructField('country', StringType(), True),
        ]), True),
    ])
    
    
    
    df2 = df.withColumn('collection', f.from_json('collection', schema))
    cols = df2.select('collection.*').columns
    
    df2.withColumn('collection', f.arrays_zip(f.array(*map(lambda x: f.lit(x), cols)), f.array('collection.*'))) \
       .withColumn('collection', f.explode('collection')) \
       .withColumn('key', f.col('collection.0')) \
       .withColumn('value', f.col('collection.1')) \
       .drop('collection').show(10, False)
    
    
    +---+-----+---+----------------------------+
    |id |name |key|value                       |
    +---+-----+---+----------------------------+
    |111|aaaaa|1  |[city_1, state_1, country_1]|
    |111|aaaaa|2  |[city_2, state_2, country_2]|
    |111|aaaaa|3  |[city_3, state_3, country_3]|
    |222|bbbbb|1  |[city_1, state_1, country_1]|
    |222|bbbbb|2  |[city_2, state_2, country_2]|
    |222|bbbbb|3  |[city_3, state_3, country_3]|
    +---+-----+---+----------------------------+
    

    【讨论】:

    • 嗨 Lamanus,很抱歉打扰你 在上面的例子中,我只是在Collection attribute 中提供了json string 的一部分。实际上我的Collection 属性数据因每一行而异,因此之前很难指定架构。有一点很清楚,我的集合属性由 key==> 嵌套映射和嵌套映射等组成。` {"1" : {.........{}.....{}}, " 2" : {.........{}.....{}.....{}.....{}}, "3" : {....... ..{}}} `
    • 如果您在collection 中看到上述小示例数据对于每一行都是不同的,这就是为什么我要将整个集合保留为单独的列实际上我的集合属性数据会动态变化,
    • 请查看问题中的NOTE
    【解决方案3】:

    这是一个 hacky 解决方案(不理想,因为它使用底层 RDD),但我已经在架构不一致且看起来很健壮的场景中对其进行了测试:

    from pyspark.sql import Row
    
    rdd1 = df.rdd
    
    rdd1.map(lambda x: [(key, val) if key != 'collection' else (key, eval(val))
                   for key, val in x.asDict().items()])
        .map(lambda x: Row(**dict(x)))
        .toDF().show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-06-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-20
      相关资源
      最近更新 更多