【问题标题】:Apache spark: Write JSON DataFrame partitionBy nested columnsApache spark:通过嵌套列编写 JSON DataFrame 分区
【发布时间】:2019-03-17 18:23:08
【问题描述】:

我有这种 JSON 数据:

{
 "data": [
    {
      "id": "4619623",
      "team": "452144",
      "created_on": "2018-10-09 02:55:51",
      "links": {
        "edit": "https://some_page",
        "publish": "https://some_publish",
        "default": "https://some_default"
      }
    },
    {
      "id": "4619600",
      "team": "452144",
      "created_on": "2018-10-09 02:42:25",
      "links": {
        "edit": "https://some_page",
        "publish": "https://some_publish",
        "default": "https://some_default"
      }
    }
}

我使用 Apache spark 读取了这些数据,我想将它们按 id 列分区写入。当我使用这个时: df.write.partitionBy("data.id").json(<path_to_folder>)

我会得到错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Partition column data.id not found in schema

我也尝试过使用这样的爆炸功能:

import org.apache.spark.sql.functions.{col, explode}
val renamedDf= df.withColumn("id", explode(col("data.id")))
renamedDf.write.partitionBy("id").json(<path_to_folder>)

这确实有帮助,但每个 id 分区文件夹都包含相同的原始 JSON 文件。

编辑:df DataFrame 的架构:

 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created_on: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- links: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- edit: string (nullable = true)
 |    |    |    |-- publish: string (nullable = true)

重命名的 Df DataFrame 的架构:

 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created_on: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- links: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- edit: string (nullable = true)
 |    |    |    |-- publish: string (nullable = true)
 |-- id: string (nullable = true)

我使用的是 spark 2.1.0

我找到了这个解决方案:DataFrame partitionBy on nested columns

还有这个例子:http://bigdatums.net/2016/02/12/how-to-extract-nested-json-data-in-spark/

但是这些都没有帮助我解决我的问题。

感谢 andvance 提供的任何帮助。

【问题讨论】:

  • 你能发布 df.printSchema 的输出并重命名为 DF.printschema
  • 最新的 spark 有一个多行选项来读取嵌套的 json,你可以试试
  • 抱歉回复晚了。我在上面的问题中添加了您需要的架构。

标签: json apache-spark dataframe partition-by


【解决方案1】:

试试下面的代码:

val renamedDf = df
         .select(explode(col("data")) as "x" )
         .select($"x.*")             
renamedDf.write.partitionBy("id").json(<path_to_folder>)

【讨论】:

  • 完美。这样可行。非常感谢您的帮助:)
【解决方案2】:

您只是在初始爆炸后缺少一个选择语句

val df = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json("/FileStore/tables/test.json")
df.printSchema

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created_on: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- links: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- edit: string (nullable = true)
 |    |    |    |-- publish: string (nullable = true)
 |    |    |-- team: string (nullable = true)

import org.apache.spark.sql.functions.{col, explode}
val df1= df.withColumn("data", explode(col("data")))
df1.printSchema

root
 |-- data: struct (nullable = true)
 |    |-- created_on: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- links: struct (nullable = true)
 |    |    |-- default: string (nullable = true)
 |    |    |-- edit: string (nullable = true)
 |    |    |-- publish: string (nullable = true)
 |    |-- team: string (nullable = true)

val df2 = df1.select("data.created_on","data.id","data.team","data.links")
df2.show

+-------------------+-------+------+--------------------+
|         created_on|     id|  team|               links|
+-------------------+-------+------+--------------------+
|2018-10-09 02:55:51|4619623|452144|[https://some_def...|
|2018-10-09 02:42:25|4619600|452144|[https://some_def...|
+-------------------+-------+------+--------------------+

df2.write.partitionBy("id").json("FileStore/tables/test_part.json")
val f = spark.read.json("/FileStore/tables/test_part.json/id=4619600")
f.show

+-------------------+--------------------+------+
|         created_on|               links|  team|
+-------------------+--------------------+------+
|2018-10-09 02:42:25|[https://some_def...|452144|
+-------------------+--------------------+------+

val full = spark.read.json("/FileStore/tables/test_part.json")
full.show

+-------------------+--------------------+------+-------+
|         created_on|               links|  team|     id|
+-------------------+--------------------+------+-------+
|2018-10-09 02:55:51|[https://some_def...|452144|4619623|
|2018-10-09 02:42:25|[https://some_def...|452144|4619600|
+-------------------+--------------------+------+-------+

【讨论】:

    猜你喜欢
    • 2016-07-19
    • 2020-11-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-26
    • 2015-12-20
    相关资源
    最近更新 更多