【问题标题】:Transform a list of files (JSON) to a dataframe将文件列表 (JSON) 转换为数据框
【发布时间】:2018-02-12 13:33:32
【问题描述】:

Spark 版本: '2.0.0.2.5.0.0-1245'


所以,我最初的问题有所改变,但仍然是同一个问题。

我想要做的是加载大量 JSON 文件并将它们转换为 DataFrame - 也可能将它们保存为 CSV 或 parquet 文件以供进一步处理。每个 JSON 文件代表最终 DataFrame 中的一行。

import os
import glob

HDFS_MOUNT = # ...
DATA_SET_BASE = # ...

schema = StructType([
        StructField("documentId", StringType(), True),
        StructField("group", StringType(), True),
        StructField("text", StringType(), True)     
])

# Get the file paths    
file_paths = glob.glob(os.path.join(HDFS_MOUNT, DATA_SET_BASE, '**/*.json'))
file_paths = [f.replace(HDFS_MOUNT + '/', '') for f in file_paths]

print('Found {:d} files'.format(len(file_paths))) # 676 files

sql = SQLContext(sc)

df = sql.read.json(file_paths, schema=schema)

print('Loaded {:d} rows'.format(df.count())) # 9660 rows (what !?)

除了有 9660 行而不是 676 行(可用文件数)之外,我还遇到内容似乎是 None 的问题:

df.head(2)[0].asDict()

给予

{
 'documentId': None,
 'group': None,
 'text': None,
}

示例数据

这当然只是假数据,但它与实际数据相似。

注意:某些字段可能会丢失,例如text 不能一直存在。

a.json

{
  "documentId" : "001",
  "group" : "A",
  "category" : "indexed_document",
  "linkIDs": ["adiojer", "asdi555", "1337"]
}

b.json

{
  "documentId" : "002",
  "group" : "B",
  "category" : "indexed_document",
  "linkIDs": ["linkId", "1000"],
  "text": "This is the text of this document"
}

【问题讨论】:

  • 所有文件的结构都一样吗?它们是否放在一个目录中?
  • @IuriiNedostup 好吧,RDBDocument.to_row 确保每一行具有相同的结构 - 我不知道是否有更好的方法来做到这一点,但每个 JSON 基本上代表最后一行数据框。是的,JSON 文件在同一个目录中。

标签: hadoop pyspark hdfs


【解决方案1】:

假设你所有的文件都具有相同的结构并且在同一个目录中:

df = sql_cntx.read.json('/hdfs/path/to/folder/*.json')

如果任何列的所有行都具有 Null 值,则可能会出现问题。然后 spark 将无法确定架构,因此您可以选择告诉 spark 使用哪个架构:

from pyspark import SparkContext, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, LongType

sc = SparkContext(appName="My app")
sql_cntx = SQLContext(sc)

schema = StructType([
    StructField("field1", StringType(), True),
    StructField("field2", LongType(), True)
])

df = sql_cntx.read.json('/hdfs/path/to/folder/*.json', schema=schema)

统一更新: 如果文件具有多行格式的 json,您可以尝试以下代码:

sc = SparkContext(appName='Test') 
sql_context = SQLContext(sc) 

rdd = sc.wholeTextFiles('/tmp/test/*.json').values() 
df = sql_context.read.json(rdd, schema=schema)
df.show()

【讨论】:

  • 我明白了……嗯,是的……可能是因为空值。我现在看到的唯一选择是读取 json 标头,定义架构并制作数据框。
  • 可以发几行作为例子吗?
  • 我添加了两个“示例文件”。它表明,例如text 并不总是存在。所有字段都应为StringType。我试图指定 schame 但 df.head(1)[0].asDict() 返回所有值都是 None :/
  • 我现在已经将我原来的问题稍微改成了我们现在所处的位置。奇怪的是行数是例如不同的..
  • 快速问题:文件可以有多行吗?它们是如何分开的?
猜你喜欢
  • 2020-05-15
  • 2019-02-08
  • 2023-02-23
  • 2018-07-21
  • 2020-09-02
  • 2021-02-09
  • 1970-01-01
  • 2023-01-13
  • 2020-02-29
相关资源
最近更新 更多