【问题标题】:Read JSON file as Pyspark Dataframe using PySpark?使用 PySpark 将 JSON 文件读取为 Pyspark Dataframe?
【发布时间】:2018-08-30 03:42:37
【问题描述】:

如何读取以下 JSON 结构以使用 PySpark 触发数据帧?

我的 JSON 结构

{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}

我试过了:

df = spark.read.json('simple.json');

我希望输出 a、b、c 作为列,将值作为相应的行。

谢谢。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:

    Json 字符串变量

    如果你有 json 字符串作为变量 那么你可以这样做

    simple_json = '{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}'
    rddjson = sc.parallelize([simple_json])
    df = sqlContext.read.json(rddjson)
    
    from pyspark.sql import functions as F
    df.select(F.explode(df.results).alias('results')).select('results.*').show(truncate=False)
    

    这会给你

    +---+---+----+
    |a  |b  |c   |
    +---+---+----+
    |1  |2  |name|
    |2  |5  |foo |
    +---+---+----+
    

    Json 字符串作为文件中的单独行(sparkContext 和 sqlContext)

    如果您在文件中有 json 字符串作为单独的行,那么您可以 使用 sparkContext 将其读取到 rdd[string] 中,其余过程相同如上

    rddjson = sc.textFile('/home/anahcolus/IdeaProjects/pythonSpark/test.csv')
    df = sqlContext.read.json(rddjson)
    df.select(F.explode(df['results']).alias('results')).select('results.*').show(truncate=False)
    

    Json 字符串作为文件中的单独行(仅限 sqlContext)

    如果您将 json 字符串作为文件中的单独行,那么您只能使用 sqlContext。但是这个过程很复杂,因为你必须为它创建架构

    df = sqlContext.read.text('path to the file')
    
    from pyspark.sql import functions as F
    from pyspark.sql import types as T
    df = df.select(F.from_json(df.value, T.StructType([T.StructField('results', T.ArrayType(T.StructType([T.StructField('a', T.IntegerType()), T.StructField('b', T.IntegerType()), T.StructField('c', T.StringType())])))])).alias('results'))
    df.select(F.explode(df['results.results']).alias('results')).select('results.*').show(truncate=False)
    

    这应该给你与上述结果相同的结果

    希望回答对你有帮助

    【讨论】:

    • 5 小时后降落在这里,我只能用 pandas 找到解决方案,但不能用 pyspark。谢谢。虽然用模式阅读它是相当棘手的
    【解决方案2】:
    !pip install findspark
    !pip install pyspark
    import findspark
    import pyspark
    findspark.init()
    sc = pyspark.SparkContext.getOrCreate()
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('abc').getOrCreate()
    

    让我们生成自己的 JSON 数据 这样我们就不必访问文件系统了。

    stringJSONRDD = sc.parallelize((""" 
      { "id": "123",
        "name": "Katie",
        "age": 19,
        "eyeColor": "brown"
      }""",
       """{
        "id": "234",
        "name": "Michael",
        "age": 22,
        "eyeColor": "green"
      }""", 
      """{
        "id": "345",
        "name": "Simone",
        "age": 23,
        "eyeColor": "blue"
      }""")
    )
    

    然后创建数据帧

    swimmersJSON = spark.read.json(stringJSONRDD)
    

    创建临时表

    swimmersJSON.createOrReplaceTempView("swimmersJSON")
    

    希望这对您有所帮助。完整代码可以参考这个GitHub repository

    【讨论】:

      【解决方案3】:
      from pyspark.sql import SparkSession
      from pyspark.sql.functions import col
      from pyspark.sql.functions import explode
      
      spark = SparkSession.builder.getOrCreate()
      sc = spark.sparkContext
      json_data = '{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}'
      json_rdd = sc.parallelize([json_data])
      df = spark.read.json(json_rdd)
      df =df.withColumn("results", explode(df.results)).select( 
                               col("results.a").alias("a"),
                               col("results.b").alias("b"),
                               col("results.c").alias("c") ) 
      df.show()
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-12-17
        • 1970-01-01
        • 1970-01-01
        • 2023-04-07
        • 2021-06-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多