【问题标题】:Retain keys with null values while writing JSON in spark在 Spark 中编写 JSON 时保留具有空值的键
【发布时间】:2017-10-31 11:33:49
【问题描述】:

我正在尝试使用 spark 编写 JSON 文件。有些键具有null 作为值。这些在DataSet 中显示得很好,但是当我编写文件时,密钥被丢弃。我如何确保它们被保留?

写入文件的代码:

ddp.coalesce(20).write().mode("overwrite").json("hdfs://localhost:9000/user/dedupe_employee");

来自源的部分 JSON 数据:

"event_header": {
        "accept_language": null,
        "app_id": "App_ID",
        "app_name": null,
        "client_ip_address": "IP",
        "event_id": "ID",
        "event_timestamp": null,
        "offering_id": "Offering",
        "server_ip_address": "IP",
        "server_timestamp": 1492565987565,
        "topic_name": "Topic",
        "version": "1.0"
    }

输出:

"event_header": {
        "app_id": "App_ID",
        "client_ip_address": "IP",
        "event_id": "ID",
        "offering_id": "Offering",
        "server_ip_address": "IP",
        "server_timestamp": 1492565987565,
        "topic_name": "Topic",
        "version": "1.0"
    }

在上面的示例中,键 accept_languageapp_nameevent_timestamp 已被删除。

【问题讨论】:

    标签: java json apache-spark apache-spark-sql


    【解决方案1】:

    显然,spark 没有提供任何处理空值的选项。因此,以下自定义解决方案应该可以工作。

    import com.fasterxml.jackson.module.scala.DefaultScalaModule
    import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
    import com.fasterxml.jackson.databind.ObjectMapper
    
    case class EventHeader(accept_language:String,app_id:String,app_name:String,client_ip_address:String,event_id: String,event_timestamp:String,offering_id:String,server_ip_address:String,server_timestamp:Long,topic_name:String,version:String)
    
    val ds = Seq(EventHeader(null,"App_ID",null,"IP","ID",null,"Offering","IP",1492565987565L,"Topic","1.0")).toDS()
    
    val ds1 = ds.mapPartitions(records => {
    val mapper = new ObjectMapper with ScalaObjectMapper
    mapper.registerModule(DefaultScalaModule)
    records.map(mapper.writeValueAsString(_))
    })
    
    ds1.coalesce(1).write.text("hdfs://localhost:9000/user/dedupe_employee")
    

    这将产生如下输出:

    {"accept_language":null,"app_id":"App_ID","app_name":null,"client_ip_address":"IP","event_id":"ID","event_timestamp":null,"offering_id":"Offering","server_ip_address":"IP","server_timestamp":1492565987565,"topic_name":"Topic","version":"1.0"}
    

    【讨论】:

    • 你能在pyspark中提供这样的解决方案吗?谢谢
    【解决方案2】:

    如果您使用的是 Spark 3,则可以添加

    spark.sql.jsonGenerator.ignoreNullFields false
    

    【讨论】:

      【解决方案3】:

      ignoreNullFields 是您希望从 Spark 3 开始将 DataFrame 转换为 json 文件时设置的选项。

      如果您需要 Spark 2(特别是 PySpark 2.4.6),您可以尝试使用 Python dict 格式将 DataFrame 转换为 rdd。然后调用pyspark.rdd.saveTextFile将json文件输出到hdfs。以下示例可能会有所帮助。

      cols = ddp.columns
      ddp_ = ddp.rdd
      ddp_ = ddp_.map(lambda row: dict([(c, row[c]) for c in cols])
      ddp_ = ddp.repartition(1).saveAsTextFile(your_hdfs_file_path)
      

      这应该会产生类似的输出文件,

      {"accept_language": None, "app_id":"123", ...}
      {"accept_language": None, "app_id":"456", ...}
      

      更重要的是,如果你想用 JSON null 替换 Python None,你需要将每个 dict 转储到 json 中。

      ddp_ = ddp_.map(lambda row: json.dumps(row, ensure.ascii=False))
      

      【讨论】:

        【解决方案4】:

        从 Spark 3 开始,如果您使用的是 DataFrameWriter 类

        https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameWriter.html#json-java.lang.String-

        (同样适用于 pyspark)

        https://spark.apache.org/docs/3.0.0-preview/api/python/_modules/pyspark/sql/readwriter.html

        它的 json 方法有一个选项 ignoreNullFields=None

        其中 None 表示 True。

        所以只需将此选项设置为 false。

        ddp.coalesce(20).write().mode("overwrite").option("ignoreNullFields", "false").json("hdfs://localhost:9000/user/dedupe_employee")
        

        【讨论】:

          猜你喜欢
          • 2022-11-12
          • 2011-12-29
          • 2022-07-25
          • 1970-01-01
          • 2023-01-19
          • 2018-06-27
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多