【问题标题】:Pyspark : Convert a CSV to Nested JSONPyspark:将 CSV 转换为嵌套 JSON
【发布时间】:2020-04-02 19:01:15
【问题描述】:

我是 pyspark 的新手。我有一个要求,我需要将 hdfs 位置的大 CSV 文件转换为基于不同 primaryId 的多个嵌套 JSON 文件。

示例输入: data.csv

**PrimaryId,FirstName,LastName,City,CarName,DogName**
100,John,Smith,NewYork,Toyota,Spike
100,John,Smith,NewYork,BMW,Spike
100,John,Smith,NewYork,Toyota,Rusty
100,John,Smith,NewYork,BMW,Rusty
101,Ben,Swan,Sydney,Volkswagen,Buddy
101,Ben,Swan,Sydney,Ford,Buddy
101,Ben,Swan,Sydney,Audi,Buddy
101,Ben,Swan,Sydney,Volkswagen,Max
101,Ben,Swan,Sydney,Ford,Max
101,Ben,Swan,Sydney,Audi,Max
102,Julia,Brown,London,Mini,Lucy

示例输出文件:

文件1: Output_100.json

{
    "100": [
        {
            "City": "NewYork", 
            "FirstName": "John", 
            "LastName": "Smith", 
            "CarName": [
                "Toyota", 
                "BMW"
            ], 
            "DogName": [
                "Spike", 
                "Rusty"
            ]
        }
}

文件2: Output_101.json

{
    "101": [
        {
            "City": "Sydney", 
            "FirstName": "Ben", 
            "LastName": "Swan", 
            "CarName": [
                "Volkswagen", 
                "Ford", 
                "Audi"
            ], 
            "DogName": [
                "Buddy", 
                "Max"
            ]
        }
}

文件3: Output_102.json

{
    "102": [
        {
            "City": "London", 
            "FirstName": "Julia", 
            "LastName": "Brown", 
            "CarName": [
                "Mini"
            ], 
            "DogName": [
                "Lucy"
            ]
        }
    ]
}

我们将不胜感激。

【问题讨论】:

  • 你试过了吗?已经?您可以生成一组主 ID,然后遍历每个条目,生成一个字典数组。
  • 我并不是真正的编程背景,并尝试了谷歌的一些解决方案,但这不能满足我的要求。这就是我寻求帮助的原因!!!

标签: python json csv hadoop pyspark


【解决方案1】:

看来您需要对 Id 进行分组,并将 Cars 和 Dogs 收集为一组。

从 pyspark.sql.functions 导入 collect_set

df = spark.read.format("csv").option("header", "true").load("cars.csv")
df2 = (
    df
    .groupBy("PrimaryId","FirstName","LastName")
    .agg(collect_set('CarName').alias('CarName'), collect_set('DogName').alias('DogName'))
)
df2.write.format("json").save("cars.json", mode="overwrite")

生成的文件:

{"PrimaryId":"100","FirstName":"John","LastName":"Smith","CarName":["Toyota","BMW"],"DogName":["Spike","Rusty"]}

{"PrimaryId":"101","FirstName":"Ben","LastName":"Swan","CarName":["Ford","Volkswagen","Audi"],"DogName":["Max","Buddy"]}

{"PrimaryId":"102","FirstName":"Julia","LastName":"Brown","CarName":["Mini"],"DogName":["Lucy"]}

如果这就是你要找的,请告诉我。

【讨论】:

  • 感谢您的帮助。这只是一个示例文件,我必须在实际文件上使用此代码,其中包含 40-50 个字段和数百万行。
【解决方案2】:

您可以使用 pandas.groupby() 按 Id 分组,然后遍历 DataFrameGroupBy 对象创建对象并写入文件。

您需要通过$ pip install pandas 将pandas 安装到您的virtualenv 中。

# coding: utf-8
import json
import pandas as pd


def group_csv_columns(csv_file):
    df = pd.read_csv(csv_file)
    group_frame = df.groupby(['PrimaryId'])

    for i in group_frame:
        data_frame = i[1]
        data = {}
        data[i[0]] = [{
            "City": data_frame['City'].unique().tolist()[0],
            "FirstName": data_frame['FirstName'].unique().tolist()[0],
            "CarName": data_frame['CarName'].unique().tolist(),
            'DogName': data_frame['DogName'].unique().tolist(),
            'LastName': data_frame['LastName'].unique().tolist()[0],
        }]
        # Write to file
        file_name = 'Output_' + str(i[0]) + '.json'
        with open(file_name, 'w') as fh:
            contents = json.dumps(data)
            fh.write(contents)


group_csv_columns('/tmp/sample.csv')

调用group_csv_columns(),文件名包含csv内容。

查看pandas docs

【讨论】:

  • 我们可以将 hdfs 路径作为输入/输出而不是本地文件系统吗?
  • 你可以试试with hd.open("/home/file.csv") as f: df = pd.read_csv(f),看看这个答案stackoverflow.com/a/35644136/1831811
  • 当然会尝试,但我认为这段代码在 hadoop 环境中应该是有效的。
猜你喜欢
  • 1970-01-01
  • 2018-01-07
  • 2020-10-28
  • 2021-03-12
  • 2022-01-07
  • 2018-10-27
  • 2021-05-17
  • 2021-01-08
相关资源
最近更新 更多