【问题标题】:write spark dataframe as array of json (pyspark)将 spark 数据帧写入 json 数组(pyspark)
【发布时间】:2023-04-11 01:24:02
【问题描述】:

我想将我的 spark 数据框写为一组 JSON 文件,尤其是每个文件都写为 JSON 数组。 让我用一个简单的(可重现的)代码来解释。

我们有:

import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))

将数据框保存为:

df.write.json('s3://path/to/json')

刚刚创建的每个文件每行都有一个 JSON 对象,例如:

{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}

但我希望每个文件拥有一个 JSON 数组:

[
   {"x":0.9953802385540144,"y":0.476027611419198},
   {"x":0.929599290575914,"y":0.72878523939521},
   {"x":0.951701684432855,"y":0.8008064729546504}
]

【问题讨论】:

  • 每个执行器并行写入其数据。您可以连接所有零件文件并自己添加括号。
  • 这只是一种解决方法,即使它对我来说不可行,因为一旦在 s3 上写入文件,就会触发 lambda...
  • 您是否可以让每个文件成为json 的数组,或者您是否希望将全部内容放在一个文件中。如果你使用 spark 2.4 df.coalesce(1).write.json("path", lineSep="\n,") 几乎可以让你到达那里......
  • @pault,可以让每个文件都是一个 json 数组,而不仅仅是一个文件

标签: python json apache-spark pyspark


【解决方案1】:

目前不可能让 spark “本地”以您想要的格式写入单个文件,因为 spark 以分布式(并行)方式工作,每个执行程序独立写入其数据部分。

但是,由于您是 okay with having each file be an array of json not only [one] file,因此您可以使用以下一种解决方法来实现所需的输出:

from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct

df.select(to_json(struct(*df.columns)).alias("json"))\
    .groupBy(spark_partition_id())\
    .agg(collect_list("json").alias("json_list"))\
    .select(col("json_list").cast("string"))\
    .write.text("s3://path/to/json")

首先,您从df 中的所有列创建一个json。然后按 spark 分区 ID 分组并使用 collect_list 聚合。这将把该分区上的所有jsons 放入一个列表中。由于您在分区内进行聚合,因此不需要对数据进行混洗。

现在选择列表列,转换为字符串,并将其写入文本文件。

这是一个文件的外观示例:

[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]

请注意,您可能会得到一些空文件。


如果您指定一个空的groupBy,您可能可以强制 spark 将数据写入 ONE 文件,但这会导致将所有数据强制写入单个分区,从而导致内存不足错误。

【讨论】:

  • 感谢@pault,您编写的解决方案又是一种解决方法,但我认为这是实现目标的唯一方法。事实上,除了连接单个 json 之外,Spark 没有其他选择来编写文件:每个执行程序只是编写自己的一组 json 对象,对其他执行程序一无所知(因为它们以并行方式工作)。因此,添加尾随/结束括号或逗号来分隔 json 对象将违背 spark“哲学”。您能否将此推理添加到您的答案中,以便我接受它
  • @enneppi 我根据您的反馈添加了一些说明。
  • 嗨@pault - 你会如何添加一个根元素呢? json_list 数组之前
【解决方案2】:

如果数据不是超级庞大,并且可以将列表作为一个 JSON 文件,则以下解决方法也有效。首先,将 Pyspark 数据帧转换为 Pandas,然后转换为字典列表。然后,列表可以转储为 JSON。

list_of_dicts = df.toPandas().to_dict('records')
json_file = open('path/to/file.json', 'w')

json_file.write(json.dumps(list_of_dicts))
json_file.close()

【讨论】:

    猜你喜欢
    • 2018-09-06
    • 2020-10-06
    • 2018-06-24
    • 2020-11-26
    • 2018-01-17
    • 1970-01-01
    • 1970-01-01
    • 2018-08-13
    • 1970-01-01
    相关资源
    最近更新 更多