【问题标题】:How to save Map to Json in Spark using scala?如何使用 scala 将 Map 保存到 Spark 中的 Json?
【发布时间】:2017-10-28 21:30:54
【问题描述】:

我需要使用 Spark 将 Map(键值对)保存在一列中。要求是其他人可以将数据与其他工具(如 PIG)一起使用,因此最好将 Map 保存为通用格式而不是特殊格式的字符串。我使用此代码创建列:

StructField("cMap", DataTypes.createMapType(StringType, StringType), true) ::

然后在我创建数据框后,我得到了架构:

|-- cMap: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)

然后我将数据框保存到 Json:

df.write.json(path)

我发现Json输出是:

"cMap":{"1":"a","2":"b","3":"c"}

所以一旦我下次从文件中读取它:

val new_df = sqlContext.read.json(path)

我得到了架构:

|-- cMap: struct (nullable = true)
|    |-- 1: string
|    |-- 2: string
|    |-- 3: string

是否有任何有效的方法可以在不进行额外处理的情况下在 Json 中保存和读取地图(我可以将地图保存为特殊字符串并对其进行解码,但我认为它不应该那么复杂)。谢谢。

【问题讨论】:

    标签: java sql json scala apache-spark


    【解决方案1】:

    您可以将表格另存为parquet文件

    • 写:

      df.write.parquet("mydf.parquet")

    • 阅读

      val new_df = spark.read.parquet("mydf.parquet")

    spark guide save-modes

    // Encoders for most common types are automatically provided by importing spark.implicits._
    import spark.implicits._
    
    val peopleDF = spark.read.json("examples/src/main/resources/people.json")
    
    // DataFrames can be saved as Parquet files, maintaining the schema information
    peopleDF.write.parquet("people.parquet")
    
    // Read in the parquet file created above
    // Parquet files are self-describing so the schema is preserved
    // The result of loading a Parquet file is also a DataFrame
    val parquetFileDF = spark.read.parquet("people.parquet")
    
    // Parquet files can also be used to create a temporary view and then used in SQL statements
    parquetFileDF.createOrReplaceTempView("parquetFile")
    val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
    namesDF.map(attributes => "Name: " + attributes(0)).show()
    

    【讨论】:

      【解决方案2】:

      Parquet 格式应该可以解决您遇到的问题。 Parquet stores binary data in a column-oriented way, where the values of each column are organized so that they are all adjacent, enabling better compression

      只需将其保存到Parquet,如下所示

      df.write.mode(SaveMode.Overwrite).parquet("path to the output")
      

      并阅读如下

      val new_df = sqlContext.read.parquet("path to the above output")
      

      希望对你有帮助

      【讨论】:

        猜你喜欢
        • 2020-05-20
        • 2019-05-29
        • 2016-10-12
        • 1970-01-01
        • 2016-01-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多