【发布时间】:2021-12-09 12:32:35
【问题描述】:
以下是我们的 pyspark 应用程序代码 sn-p。
schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
df_new = df.withColumn('value', from_json('value', schema)) \
.where(col('value.version') == '1') \
.select(col('value.*'))\
.na.drop() \
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.show()
+-------+--------+---------------------------------------------+---+
| name| version| requestBody| id|
+-------+--------+---------------------------------------------+---+
|kj-test| 1|{"data": {"score": 130, "group": "silver"}} | 1|
|kj-test| 1|{"data": {"score": 250, "group": "gold"}} | 2|
|kj-test| 1|{"data": {"score": 330, "group": "platinum"}}| 3|
+-------+--------+---------------------------------------------+---+
decrypt_udf UDF 函数 sn-p:
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return decrypted_json_str
当我将 spark 数据帧写入 S3 存储桶时,如下所示
df_new.write.mode('overwrite').json(path=s3outputpath)
生成的文件内容如下,这里requestBody 的值写为String,因此用双引号和转义内部双引号。
{"name":"kj-test","version":"1","requestBody":"{\"data\": {\"score\": 130, \"group\": \"silver\"}}","id":"1"}
{"name":"kj-test","version":"2","requestBody":"{\"data\": {\"score\": 250, \"group\": \"gold\"}}","id":"1"}
{"name":"kj-test","version":"3","requestBody":"{\"data\": {\"score\": 330, \"group\": \"platinum\"}}","id":"1"}
但是,我希望 requestBody 的值可以写成如下的 json。
{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}
我知道我已将 requestBody 的类型指定为架构 StructField('requestBody', StringType(), True) 中的字符串,因此我以这种方式看到输出。我怎样才能达到我期望的输出?没有JsonType这样的类型
编辑:
请注意,我的 requestBody 架构不会总是这样 {"data": {"score": 130, "group": "silver"}}。对于给定的运行,它是固定的,但另一次运行可能具有完全不同的架构。
本质上,需要一种从 json 字符串推断模式的方法。找到一些可能有用的 SO 帖子,将尝试这些:
https://stackoverflow.com/a/45880574/948268
Spark from_json with dynamic schema
【问题讨论】:
-
您是否尝试过更改您的 UDF 并返回为
MapType而不是StringType? -
@pltc
MapType并未涵盖 json 的所有可能性。就像我无法在 MapType 中为 value 指定相应的类型。
标签: json apache-spark pyspark apache-spark-sql