【问题标题】:pyspark - Generate json from grouped datapyspark - 从分组数据生成 json
【发布时间】:2021-11-19 03:42:08
【问题描述】:

我正在尝试对来自 df 的数据进行分组,并从分组数据中生成 json 对象。我以为我走在正确的轨道上,但查看结果我认为我没有生成正确的 json 对象。

源数据

 df1 = sqlContext.createDataFrame([
      ("20210924155828",'2021-09-17 13:55:35','2021-08-15 00:00:00','PRODUCT_04_20210815.DAT', '0.057','PRODUCT'), 
      ("20210924155828",'2021-09-17 13:55:35','2021-08-15 00:00:00','PRODUCT_00_20210815.DAT', '0.068','PRODUCT'), 
      ("20210924155828",'2021-09-17 13:55:35','2021-08-15 00:00:00','PRODUCT_01_20210815.DAT', '0.089','PRODUCT'), 
      ("20210924155828",'2021-09-17 13:55:35','2021-08-15 00:00:00','PRODUCT_03_20210815.DAT', '0.100','PRODUCT')
      ], ["ID", "FileReceivedTimestamp", "SourceDataTimestamp","SourceFileName", "FileSize","FileName"])

    df1.show()
    +--------------+---------------------+-------------------+--------------------+--------+--------+
    |            ID|FileReceivedTimestamp|SourceDataTimestamp|      SourceFileName|FileSize|FileName|
    +--------------+---------------------+-------------------+--------------------+--------+--------+
    |20210924155828|  2021-09-17 13:55:35|2021-08-15 00:00:00|PRODUCT_04_202108...|   0.057| PRODUCT|
    |20210924155828|  2021-09-17 13:55:35|2021-08-15 00:00:00|PRODUCT_00_202108...|   0.068| PRODUCT|
    |20210924155828|  2021-09-17 13:55:35|2021-08-15 00:00:00|PRODUCT_01_202108...|   0.089| PRODUCT|
    |20210924155828|  2021-09-17 13:55:35|2021-08-15 00:00:00|PRODUCT_03_202108...|   0.100| PRODUCT|
    +--------------+---------------------+-------------------+--------------------+--------+--------+

利用to_json、groupBy、agg和collect_list生成json对象。

df2 = (df1.select("FileName","ID",to_json(struct("SourceFileName","FileReceivedTimestamp","FileSize")).alias("metadata"))
   .groupby("FileName","ID").agg(collect_list(col("metadata")).alias("jsonmetadata")))

我在下面看到使用数据块中的显示

为了查询和测试我的输出,我对生成的 json 进行了爆炸。尝试查看 SourceFileName 时出现此错误“无法从 MetadataArray#779 中提取值:需要结构类型但得到字符串;”

display(df2.select(explode(col("jsonmetadata")).alias("MetadataArray")
                  ,col("MetadataArray.SourceFileName").alias("SourceFileName")
                  ))

我在这里遗漏了什么....希望在使用 MetadataArray.SourceFileName、MetadataArray.FileSize 等爆炸后查询数据...

编辑:
我的期望是将 jsonmetadata 作为结构数组而不是字符串数组。删除 to_json 有助于解决我的问题。

df2 = (df1.select("FileName","ID",(struct("SourceFileName","FileReceivedTimestamp","FileSize")).alias("metadata"))
   .groupby("FileName","ID").agg(collect_list(col("metadata")).alias("jsonmetadata")))

移除 to_json 转换后的架构。

root |-- 文件名:字符串(可为空 = true)|-- ID:字符串(可为空 = true) |-- jsonmetadata: array (nullable = true) | |-- 元素:结构 (containsNull = false) | | |-- 源文件名: 字符串 (可为空=真)| | |-- FileReceivedTimestamp: 字符串 (可为空=真)| | |-- FileSize: 字符串 (nullable = true)

【问题讨论】:

  • 您的查询似乎运行良好。您真的要按 FileName 和 ID 或 SourceFileName 和 ID 进行分组吗?您能否也添加您预期的 df2?

标签: json apache-spark pyspark databricks


【解决方案1】:

由于 MetadataArray 值是 json 字符串,您需要使用具有正确架构的 from_json 函数来解析它们

from pyspark.sql.functions import col, explode, from_json

sch = "SourceFileName string, FileReceivedTimestamp string, FileSize string"

df2\
.select(explode(col("jsonmetadata")).alias("MetadataArray"), 
        from_json(col("MetadataArray"), sch).getField("SourceFileName").alias("SourceFileName"))\
.show()

# +--------------------+--------------------+
# |       MetadataArray|      SourceFileName|
# +--------------------+--------------------+
# |{"SourceFileName"...|PRODUCT_04_202108...|
# |{"SourceFileName"...|PRODUCT_00_202108...|
# |{"SourceFileName"...|PRODUCT_01_202108...|
# |{"SourceFileName"...|PRODUCT_03_202108...|
# +--------------------+--------------------+

【讨论】:

  • 感谢您的回复,应用架构。然而,在重新散列之后,我意识到 to_json 是没有必要的。使用 to_json 我得到的是字符串数组而不是结构数组。
猜你喜欢
  • 1970-01-01
  • 2014-07-30
  • 2021-07-01
  • 2016-10-17
  • 1970-01-01
  • 2020-06-10
  • 2020-11-24
  • 2014-06-29
  • 1970-01-01
相关资源
最近更新 更多