【问题标题】:Is there a function in pyspark dataframe that is similar to pandas.io.json.json_normalizepyspark 数据框中是否有类似于 pandas.io.json.json_normalize 的函数
【发布时间】:2020-05-09 05:33:12
【问题描述】:

我想执行类似于 pandas.io.json.json_normalize 的操作是 pyspark 数据帧。 spark中是否有等价的功能?

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.io.json.json_normalize.html

【问题讨论】:

    标签: python json pandas pyspark pyspark-dataframes


    【解决方案1】:

    Spark 有类似的功能explode(),但并不完全相同。

    这里是爆炸在非常高的水平上的工作原理。

    >>> from pyspark.sql.functions import explode, col
    
    >>> data = {'A': [1, 2]}
    
    >>> df = spark.createDataFrame(data)
    
    >>> df.show()
     +------+
     |     A|
     +------+
     |[1, 2]|
     +------+
    
    >>> df.select(explode(col('A')).alias('normalized')).show()
    +----------+
    |normalized|
    +----------+
    |         1|
    |         2|
    +----------+
    

    另一方面,您可以使用以下方法将 Spark DataFrame 转换为 Pandas DataFrame:

    • spark_df.toPandas() --> 利用 json_normalize() 然后恢复为 Spark 数据帧。

    • 要恢复为 Spark DataFrame,您可以使用 spark.createDataFrame(pandas_df)

    请注意,这种来回的解决方案并不理想,因为调用 toPandas() 会导致 DataFrame 的所有记录都被收集 (.collect()) 到驱动程序,并且在处理更大的数据集时可能会导致内存错误.

    下面的链接提供了有关使用 toPandas() 的更多信息: DF.topandas() throwing error in pyspark

    希望这会有所帮助,祝你好运!

    【讨论】:

      【解决方案2】:

      在 PySpark 中没有 json_normalize 的直接对应物。但 Spark 提供了不同的选择。如果您在这样的 Dataframe 中有嵌套对象

      one
      |_a
      |_..
      two
      |_b
      |_..
      

      您可以在 Spark 中选择子列,如下所示:

      import pyspark
      from pyspark.sql.session import SparkSession
      spark = SparkSession.builder.appName("stackoverflow demo").getOrCreate()
      columns = ['id', 'one', 'two']
      vals = [
           (1, {"a": False}, {"b": True}),
           (2, {"a": True}, {"b": False})
      ]
      df = spark.createDataFrame(vals, columns)
      df.select("one.a", "two.b").show()
      +-----+-----+
      |    a|    b|
      +-----+-----+
      |false| true|
      | true|false|
      +-----+-----+
      

      如果您使用来自此answer 的递归“展平”函数构建所有嵌套列的展平列表,那么我们将得到一个展平列结构:

      columns = flatten(df.schema)
      df.select(columns)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-03-27
        • 1970-01-01
        • 2011-08-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多