【发布时间】:2021-11-19 03:42:08
【问题描述】:
我正在尝试对来自 df 的数据进行分组,并从分组数据中生成 json 对象。我以为我走在正确的轨道上,但查看结果我认为我没有生成正确的 json 对象。
源数据
df1 = sqlContext.createDataFrame([
("20210924155828",'2021-09-17 13:55:35','2021-08-15 00:00:00','PRODUCT_04_20210815.DAT', '0.057','PRODUCT'),
("20210924155828",'2021-09-17 13:55:35','2021-08-15 00:00:00','PRODUCT_00_20210815.DAT', '0.068','PRODUCT'),
("20210924155828",'2021-09-17 13:55:35','2021-08-15 00:00:00','PRODUCT_01_20210815.DAT', '0.089','PRODUCT'),
("20210924155828",'2021-09-17 13:55:35','2021-08-15 00:00:00','PRODUCT_03_20210815.DAT', '0.100','PRODUCT')
], ["ID", "FileReceivedTimestamp", "SourceDataTimestamp","SourceFileName", "FileSize","FileName"])
df1.show()
+--------------+---------------------+-------------------+--------------------+--------+--------+
| ID|FileReceivedTimestamp|SourceDataTimestamp| SourceFileName|FileSize|FileName|
+--------------+---------------------+-------------------+--------------------+--------+--------+
|20210924155828| 2021-09-17 13:55:35|2021-08-15 00:00:00|PRODUCT_04_202108...| 0.057| PRODUCT|
|20210924155828| 2021-09-17 13:55:35|2021-08-15 00:00:00|PRODUCT_00_202108...| 0.068| PRODUCT|
|20210924155828| 2021-09-17 13:55:35|2021-08-15 00:00:00|PRODUCT_01_202108...| 0.089| PRODUCT|
|20210924155828| 2021-09-17 13:55:35|2021-08-15 00:00:00|PRODUCT_03_202108...| 0.100| PRODUCT|
+--------------+---------------------+-------------------+--------------------+--------+--------+
利用to_json、groupBy、agg和collect_list生成json对象。
df2 = (df1.select("FileName","ID",to_json(struct("SourceFileName","FileReceivedTimestamp","FileSize")).alias("metadata"))
.groupby("FileName","ID").agg(collect_list(col("metadata")).alias("jsonmetadata")))
我在下面看到使用数据块中的显示
为了查询和测试我的输出,我对生成的 json 进行了爆炸。尝试查看 SourceFileName 时出现此错误“无法从 MetadataArray#779 中提取值:需要结构类型但得到字符串;”
display(df2.select(explode(col("jsonmetadata")).alias("MetadataArray")
,col("MetadataArray.SourceFileName").alias("SourceFileName")
))
我在这里遗漏了什么....希望在使用 MetadataArray.SourceFileName、MetadataArray.FileSize 等爆炸后查询数据...
编辑:
我的期望是将 jsonmetadata 作为结构数组而不是字符串数组。删除 to_json 有助于解决我的问题。
df2 = (df1.select("FileName","ID",(struct("SourceFileName","FileReceivedTimestamp","FileSize")).alias("metadata"))
.groupby("FileName","ID").agg(collect_list(col("metadata")).alias("jsonmetadata")))
移除 to_json 转换后的架构。
root |-- 文件名:字符串(可为空 = true)|-- ID:字符串(可为空 = true) |-- jsonmetadata: array (nullable = true) | |-- 元素:结构 (containsNull = false) | | |-- 源文件名: 字符串 (可为空=真)| | |-- FileReceivedTimestamp: 字符串 (可为空=真)| | |-- FileSize: 字符串 (nullable = true)
【问题讨论】:
-
您的查询似乎运行良好。您真的要按 FileName 和 ID 或 SourceFileName 和 ID 进行分组吗?您能否也添加您预期的 df2?
标签: json apache-spark pyspark databricks