【发布时间】:2020-05-11 00:21:20
【问题描述】:
我正在使用来自 https://www.kaggle.com/rounakbanik/the-movies-dataset#movies_metadata.csv 的电影数据集。
credits.csv 文件包含三列:cast、crew 和 id。演员和工作人员的行都用 JSON 填充(格式错误,键和值用单引号括起来),然后我想将其提取到单独的 DataFrame 中。但是仅仅尝试加载文件是行不通的。我正在尝试如下:
import pyspark
spark=SparkSession.builder.appName('movies').getOrCreate()
df = spark.read.csv(os.path.join(input_path, 'credits.csv'), header=True)
df.printSchema()
df.show()
root
|-- cast: string (nullable = true)
|-- crew: string (nullable = true)
|-- id: string (nullable = true)
+--------------------+--------------------+--------------------+
| cast| crew| id|
+--------------------+--------------------+--------------------+
|[{'cast_id': 14, ...|"[{'credit_id': '...| 'profile_path': ...|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 8844|
|[{'cast_id': 2, '...|[{'credit_id': '5...| 15602|
|"[{'cast_id': 1, ...| 'credit_id': '52...| 'gender': 1|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 11862|
|"[{'cast_id': 25,...| 'credit_id': '52...| 'gender': 0|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 11860|
|[{'cast_id': 2, '...|[{'credit_id': '5...| 45325|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9091|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 710|
|"[{'cast_id': 1, ...| 'credit_id': '52...| 'gender': 2|
|[{'cast_id': 9, '...|"[{'credit_id': '...| 'profile_path': ...|
|"[{'cast_id': 1, ...| 'credit_id': '56...| 'gender': 0|
|"[{'cast_id': 1, ...| 'credit_id': '52...| 'gender': 2|
|"[{'cast_id': 1, ...| 'credit_id': '59...| 'gender': 2|
|"[{'cast_id': 4, ...| 'credit_id': '52...| 'gender': 2|
|[{'cast_id': 6, '...|[{'credit_id': '5...| 4584|
|[{'cast_id': 42, ...|"[{'credit_id': '...| 'profile_path': ...|
|"[{'cast_id': 1, ...| 'order': 14| 'profile_path': ...|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 11517|
+--------------------+--------------------+--------------------+
only showing top 20 rows
ID 列只能包含数字。 演员和工作人员行应该作为字符串加载,就像我尝试使用 Pandas 加载数据时发生的那样。
import pandas as pd
df=pd.read_csv(os.path.join(input_path, 'credits.csv'))
type(df.cast[0])
str
如何将数据加载到 Spark DataFrame 中并将每行的 JSON 数据收集到新的 DataFrame 中?
【问题讨论】:
标签: python apache-spark pyspark