【问题标题】:running nested jobs in spark在 Spark 中运行嵌套作业
【发布时间】:2016-07-28 12:20:35
【问题描述】:

我正在使用 PySpark。我在 s3 上有一个 gzip 的 json 文件列表,我必须访问、转换然后在 parquet 中导出到 s3。每个 json 文件包含大约 100k 行,因此并行化它没有多大意义(但我愿意并行化它),但我已经并行化了大约 5k 个文件。我的方法是将json文件列表传递给脚本->在列表上运行并行化->运行映射(?这是我被阻止的地方)。如何访问和转换 json 从转换后的 json 创建一个 DF 并将其作为镶木地板转储到 s3 中。

【问题讨论】:

  • 您可以将逗号连接的路径列表传递给阅读器。
  • 根据您的 spark 安装,您可以像这样直接从 spark 读取 s3:rawtext = sc.textFile('s3://bucket/file')
  • 当我倾倒镶木地板时,它应该针对每个 json,意思是 1.json => 1.parquet,如果我用逗号分隔文件名,这种一致性将会丢失。

标签: json amazon-s3 apache-spark pyspark


【解决方案1】:

要以分布式方式读取 json,您需要像您提到的那样并行化您的密钥。要在从 s3 读取时执行此操作,您需要使用 boto3。下面是如何做到这一点的骨架草图。您可能需要修改 DistributedJsonRead 以适合您的用例。

import boto3
import json
from pyspark.sql import Row

def distributedJsonRead(s3Key):
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=key)
    contents = json.loads(s3obj.get()['Body'].read().decode('utf-8'))
    return Row(**contents)

pkeys = sc.parallelize(keyList) #keyList is a list of s3 keys
dataRdd = pkeys.map(distributedJsonRead)

Boto3 参考:http://boto3.readthedocs.org/en/latest/guide/quickstart.html

编辑:解决输入文件到输出文件的 1:1 映射问题

稍后,合并 parquet 数据集可能会更易于使用。但如果这是你需要这样做的方式,你可以尝试这样的事情

for k in keyList:
     rawtext = sc.read.json(k) # or whichever method you need to use to read in the data
     outpath = k[:-4]+'parquet'
     rawtext.write.parquet(outpath)

如果您想要 json 到 parquet 文件的 1:1 映射,我认为您将无法并行化这些操作。 Spark 的读/写功能设计为由驱动程序调用,需要访问 sc 和 sqlContext。这是拥有 1 个 parquet 目录可能是可行的方法的另一个原因。

【讨论】:

  • 但我也想知道镶木地板是从哪个文件生成的,即如果假设 json 在s3://bucket/json/12/3/abc.json 中,其结果将转到s3://bucket/json/12/3/abc.parquet
  • 每天产生大约 20GB 的数据。通常在这种情况下,我必须分析一两天的数据,合并无济于事。所以基本上你是说我不能并行化文件读取。
猜你喜欢
  • 1970-01-01
  • 2018-10-29
  • 1970-01-01
  • 2023-01-25
  • 1970-01-01
  • 2015-08-03
  • 2015-05-31
  • 2016-10-03
  • 1970-01-01
相关资源
最近更新 更多