【发布时间】:2018-02-12 13:33:32
【问题描述】:
Spark 版本: '2.0.0.2.5.0.0-1245'
所以,我最初的问题有所改变,但仍然是同一个问题。
我想要做的是加载大量 JSON 文件并将它们转换为 DataFrame - 也可能将它们保存为 CSV 或 parquet 文件以供进一步处理。每个 JSON 文件代表最终 DataFrame 中的一行。
import os
import glob
HDFS_MOUNT = # ...
DATA_SET_BASE = # ...
schema = StructType([
StructField("documentId", StringType(), True),
StructField("group", StringType(), True),
StructField("text", StringType(), True)
])
# Get the file paths
file_paths = glob.glob(os.path.join(HDFS_MOUNT, DATA_SET_BASE, '**/*.json'))
file_paths = [f.replace(HDFS_MOUNT + '/', '') for f in file_paths]
print('Found {:d} files'.format(len(file_paths))) # 676 files
sql = SQLContext(sc)
df = sql.read.json(file_paths, schema=schema)
print('Loaded {:d} rows'.format(df.count())) # 9660 rows (what !?)
除了有 9660 行而不是 676 行(可用文件数)之外,我还遇到内容似乎是 None 的问题:
df.head(2)[0].asDict()
给予
{
'documentId': None,
'group': None,
'text': None,
}
示例数据
这当然只是假数据,但它与实际数据相似。
注意:某些字段可能会丢失,例如
text不能一直存在。
a.json
{
"documentId" : "001",
"group" : "A",
"category" : "indexed_document",
"linkIDs": ["adiojer", "asdi555", "1337"]
}
b.json
{
"documentId" : "002",
"group" : "B",
"category" : "indexed_document",
"linkIDs": ["linkId", "1000"],
"text": "This is the text of this document"
}
【问题讨论】:
-
所有文件的结构都一样吗?它们是否放在一个目录中?
-
@IuriiNedostup 好吧,
RDBDocument.to_row确保每一行具有相同的结构 - 我不知道是否有更好的方法来做到这一点,但每个 JSON 基本上代表最后一行数据框。是的,JSON 文件在同一个目录中。