【问题标题】:PySpark: How to Read Many JSON Files, Multiple Records Per FilePySpark:如何读取多个 JSON 文件,每个文件多个记录
【发布时间】:2015-04-25 11:46:05
【问题描述】:

我有一个存储在 S3 存储桶中的大型数据集,但它不是单个大文件,而是由许多(准确地说是 113K)单独的 JSON 文件组成,每个文件包含 100-1000 个观察值。这些观察不是最高级别的,但需要在每个 JSON 中进行一些导航才能访问。 IE。 json["interactions"] 是一个字典列表。

我正在尝试使用 Spark/PySpark(1.1.1 版)来解析和减少这些数据,但我无法找到将其加载到 RDD 中的正确方法,因为它不是所有记录 > 一个文件(在这种情况下我会使用 sc.textFile,虽然这里增加了 JSON 的复杂性),也不是每个记录 > 一个文件(在这种情况下我会使用 sc.wholeTextFiles)。

我最好的选择是使用 sc.wholeTextFiles,然后使用地图(或者在这种情况下是 flatMap?)将多个观察结果从存储在单个文件名键下提取到它们自己的键?或者有没有更简单的方法可以做到这一点,我错过了?

我在这里看到了建议只对通过 sc.textFile 加载的所有文件使用 json.loads() 的答案,但这似乎对我不起作用,因为 JSON 不是简单的最高级别列表.

【问题讨论】:

  • 我遇到了类似的问题。如果有解决方案,请告诉我。我刚开始尝试 pyspark,我在 s3 中有很多 json 文件要分析

标签: json amazon-s3 apache-spark pyspark


【解决方案1】:

前面的答案不会以分布式方式读取文件(请参阅reference)。为此,您需要并行化 s3 键,然后在下面的 flatMap 步骤中读入文件。

import boto3
import json
from pyspark.sql import Row

def distributedJsonRead(s3Key):
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=s3Key)
    contents = json.loads(s3obj.get()['Body'].read().decode('utf-8'))
    for dicts in content['interactions']
        yield Row(**dicts)

pkeys = sc.parallelize(keyList) #keyList is a list of s3 keys
dataRdd = pkeys.flatMap(distributedJsonRead)

Boto3 Reference

【讨论】:

    【解决方案2】:

    使用 DataFrame 怎么样?

    确实 testFrame = sqlContext.read.json('s3n://<bucket>/<key>') 从一个文件中给你想要的东西?

    每个观察是否都有相同的“列”(键数)?

    如果是这样,您可以使用 boto 列出要添加的每个对象,将它们读入并将它们相互合并。

    from pyspark.sql import SQLContext
    import boto3
    from pyspark.sql.types import *
    sqlContext = SQLContext(sc)
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket('<bucket>')
    
    aws_secret_access_key = '<secret>'
    aws_access_key_id = '<key>'
    
    #Configure spark with your S3 access keys
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_access_key_id)
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_secret_access_key)
    object_list = [k for k in bucket.objects.all() ]
    key_list = [k.key for k in bucket.objects.all()]
    
    paths = ['s3n://'+o.bucket_name+'/'+ o.key for o in object_list ]
    
    dataframes = [sqlContext.read.json(path) for path in paths]
    
    df = dataframes[0]
    for idx, frame in enumerate(dataframes):
        df = df.unionAll(frame)
    

    我是新手,所以我想知道是否有更好的方法来使用包含大量 s3 文件的数据帧,但到目前为止,这对我有用。

    【讨论】:

    • StackOverflow 是一个问答网站,而不是论坛。因此,我们喜欢将积极的答案表述为解决方案,而不是充满问题的喋喋不休的回答。尤其适用于回复旧线程时,因为 OP 不太可能让您参与对话。请考虑将您的回复重写为可靠的答案,如有必要,请注明。
    【解决方案3】:

    名称具有误导性(因为它是单数),但sparkContext.textFile()(至少在 Scala 中)也接受目录名称或通配符路径,因此您只能说 textFile("/my/dir/*.json")

    【讨论】:

      猜你喜欢
      • 2020-05-21
      • 2020-12-02
      • 1970-01-01
      • 1970-01-01
      • 2019-02-10
      • 2020-09-19
      • 2021-11-19
      • 1970-01-01
      • 2019-04-03
      相关资源
      最近更新 更多