【问题标题】:How to read JSON strings from CSV properlly with Pyspark?如何使用 Pyspark 从 CSV 中正确读取 JSON 字符串?
【发布时间】:2020-05-11 00:21:20
【问题描述】:

我正在使用来自 https://www.kaggle.com/rounakbanik/the-movies-dataset#movies_metadata.csv 的电影数据集。

credits.csv 文件包含三列:cast、crew 和 id。演员和工作人员的行都用 JSON 填充(格式错误,键和值用单引号括起来),然后我想将其提取到单独的 DataFrame 中。但是仅仅尝试加载文件是行不通的。我正在尝试如下:

import pyspark
spark=SparkSession.builder.appName('movies').getOrCreate()
df = spark.read.csv(os.path.join(input_path, 'credits.csv'), header=True)
df.printSchema()
df.show()

root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: string (nullable = true)

 +--------------------+--------------------+--------------------+
 |                cast|                crew|                  id|
 +--------------------+--------------------+--------------------+
 |[{'cast_id': 14, ...|"[{'credit_id': '...| 'profile_path': ...|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|                8844|
 |[{'cast_id': 2, '...|[{'credit_id': '5...|               15602|
 |"[{'cast_id': 1, ...| 'credit_id': '52...|         'gender': 1|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|               11862|
 |"[{'cast_id': 25,...| 'credit_id': '52...|         'gender': 0|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|               11860|
 |[{'cast_id': 2, '...|[{'credit_id': '5...|               45325|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|                9091|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|                 710|
 |"[{'cast_id': 1, ...| 'credit_id': '52...|         'gender': 2|
 |[{'cast_id': 9, '...|"[{'credit_id': '...| 'profile_path': ...|
 |"[{'cast_id': 1, ...| 'credit_id': '56...|         'gender': 0|
 |"[{'cast_id': 1, ...| 'credit_id': '52...|         'gender': 2|
 |"[{'cast_id': 1, ...| 'credit_id': '59...|         'gender': 2|
 |"[{'cast_id': 4, ...| 'credit_id': '52...|         'gender': 2|
 |[{'cast_id': 6, '...|[{'credit_id': '5...|                4584|
 |[{'cast_id': 42, ...|"[{'credit_id': '...| 'profile_path': ...|
 |"[{'cast_id': 1, ...|         'order': 14| 'profile_path': ...|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|               11517|
 +--------------------+--------------------+--------------------+
 only showing top 20 rows

ID 列只能包含数字。 演员和工作人员行应该作为字符串加载,就像我尝试使用 Pandas 加载数据时发生的那样。

import pandas as pd
df=pd.read_csv(os.path.join(input_path, 'credits.csv'))
type(df.cast[0])
str

如何将数据加载到 Spark DataFrame 中并将每行的 JSON 数据收集到新的 DataFrame 中?

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    转义字符和多行=True 工作

    credits = spark.read.csv('credits.csv', header=True, inferSchema=True,
                                     quote='"', escape='"', multiLine=True)
    credits.printSchema()
    credits.show()
    

    输出:

    root
     |-- cast: string (nullable = true)
     |-- crew: string (nullable = true)
     |-- id: integer (nullable = true)
    
    +--------------------+--------------------+-----+
    |                cast|                crew|   id|
    +--------------------+--------------------+-----+
    |[{'cast_id': 14, ...|[{'credit_id': '5...|  862|
    |[{'cast_id': 1, '...|[{'credit_id': '5...| 8844|
    |[{'cast_id': 2, '...|[{'credit_id': '5...|15602|
    |[{'cast_id': 1, '...|[{'credit_id': '5...|31357|
    |[{'cast_id': 1, '...|[{'credit_id': '5...|11862|
    |[{'cast_id': 25, ...|[{'credit_id': '5...|  949|
    |[{'cast_id': 1, '...|[{'credit_id': '5...|11860|
    |[{'cast_id': 2, '...|[{'credit_id': '5...|45325|
    |[{'cast_id': 1, '...|[{'credit_id': '5...| 9091|
    |[{'cast_id': 1, '...|[{'credit_id': '5...|  710|
    |[{'cast_id': 1, '...|[{'credit_id': '5...| 9087|
    |[{'cast_id': 9, '...|[{'credit_id': '5...|12110|
    |[{'cast_id': 1, '...|[{'credit_id': '5...|21032|
    |[{'cast_id': 1, '...|[{'credit_id': '5...|10858|
    |[{'cast_id': 1, '...|[{'credit_id': '5...| 1408|
    |[{'cast_id': 4, '...|[{'credit_id': '5...|  524|
    |[{'cast_id': 6, '...|[{'credit_id': '5...| 4584|
    |[{'cast_id': 42, ...|[{'credit_id': '5...|    5|
    |[{'cast_id': 1, '...|[{'credit_id': '5...| 9273|
    |[{'cast_id': 1, '...|[{'credit_id': '5...|11517|
    +--------------------+--------------------+-----+
    only showing top 20 rows
    
    
    

    【讨论】:

      【解决方案2】:

      您可以使用 csv 阅读器的PERMISSIVE 模式。以下示例将起作用。我已经验证了它可以与 Scala 一起使用。

      spark.read.format('csv').options(header='true', inferSchema='true', mode='PERMISSIVE').load(path)
      

      参考:https://docs.databricks.com/data/data-sources/read-csv.html

      【讨论】:

      • 它仍在加载我发布的数据。 ID 列包含来自其他列的一些 JSON 内容。
      猜你喜欢
      • 2017-11-26
      • 2019-11-06
      • 1970-01-01
      • 2021-11-16
      • 2019-10-29
      • 1970-01-01
      • 2020-04-30
      • 2019-08-14
      • 1970-01-01
      相关资源
      最近更新 更多