【问题标题】:pySpark - convert an entire dataframe column into JSON object before inserting into DBpySpark - 在插入数据库之前将整个数据框列转换为 JSON 对象
【发布时间】:2021-12-18 21:33:13
【问题描述】:

在这一点上,我对 pyspark 的了解非常有限,因此我正在寻找一种快速解决方案来解决我在当前实现中遇到的这个问题。我正在尝试通过 pyspark 将 JSON 文件读取到数据帧中,将其转换为可以插入数据库表(DynamoDB)的对象。表中的列应代表 JSON 文件中指定的键。例如,如果我的 JSON 文件包含以下元素:

{
   "Records":[
      {
         "column1":"Value1",
         "column2":"Value2",
         "column3":"Value3",
         "column4":{
            "sub1":"Value4",
            "sub2":"Value5",
            "sub3":{
               "sub4":"Value6",
               "sub5":"Value7"
            }
         }
      },
      {
         "column1":"Value8",
         "column2":"Value9",
         "column3":"Value10",
         "column4":{
            "sub1":"Value11",
            "sub2":"Value12",
            "sub3":{
               "sub4":"Value13",
               "sub5":"Value14"
            }
         }
      }
   ]
}

数据库表中的列分别为column1、column2、column3和column4。对于 Map 类型的 column4,我需要将整个对象转换为字符串,然后再将其插入数据库。因此,在第一行的情况下,我可以期望在该列中看到:

{
   "sub1":"Value4",
   "sub2":"Value5",
   "sub3":{
      "sub4":"Value6",
      "sub5":"Value7"
   }
}

但是,这是我在运行脚本后在数据库表中看到的内容:

{ Value4, Value5, { Value6, Value7 }}

我知道这是因为在执行 DB 插入操作之前将所有列值转换为 String 类型之前需要做一些事情:

for col in Rows.columns:
    Rows = Rows.withColumn(col, Rows[col].cast(StringType()))

我正在寻找一种方法来纠正 Column4 的内容以表示原始 JSON 对象,然后再将它们转换为 String 类型。这是我到目前为止写的(不包括数据库插入操作)

import pyspark.sql.types as T
from pyspark.sql import functions as SF

df = spark.read.option("multiline", "true").json('/home/abhishek.tirkey/Documents/test')

Records = df.withColumn("Records", SF.explode(SF.col("Records")))

Rows = Records.select(
    "Records.column1",
    "Records.column2",
    "Records.column3",
    "Records.column4",
)

for col in Rows.columns:
    Rows = Rows.withColumn(col, Rows[col].cast(StringType()))

RowsJSON = Rows.toJSON()

【问题讨论】:

  • 您能否根据您的示例输入添加表格中的输出内容,包括其列及其内容。
  • 你找到to_json_object函数了吗?这将返回一个字符串...

标签: json apache-spark pyspark apache-spark-sql


【解决方案1】:

有一个to_json 函数可以做到这一点:

from pyspark.sql import functions as F

df = df.withColumn("record", F.explode("records")).select(
    "record.column1",
    "record.column2",
    "record.column3",
    F.to_json("record.column4").alias("column4"),
)

df.show()
+-------+-------+-------+--------------------+                                  
|column1|column2|column3|             column4|
+-------+-------+-------+--------------------+
| Value1| Value2| Value3|{"sub1":"Value4",...|
| Value8| Value9|Value10|{"sub1":"Value11"...|
+-------+-------+-------+--------------------+

df.printSchema()
root
 |-- column1: string (nullable = true)
 |-- column2: string (nullable = true)
 |-- column3: string (nullable = true)
 |-- column4: string (nullable = true)

【讨论】:

    猜你喜欢
    • 2021-11-18
    • 1970-01-01
    • 2021-06-12
    • 2016-03-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多