【问题标题】:How to convert Row to json in Spark 2 Scala如何在 Spark 2 Scala 中将 Row 转换为 json
【发布时间】:2017-05-26 21:26:05
【问题描述】:

有没有一种简单的方法可以将给定的 Row 对象转换为 json?

发现这个关于将整个 Dataframe 转换为 json 输出: Spark Row to JSON

但我只想将一行转换为 json。 这是我正在尝试做的伪代码。

更准确地说,我正在读取 json 作为 Dataframe 中的输入。 我正在生成一个主要基于列的新输出,但为所有不适合列的信息使用一个 json 字段。

我的问题是编写此函数的最简单方法是什么:convertRowToJson()

def convertRowToJson(row: Row): String = ???

def transformVenueTry(row: Row): Try[Venue] = {
  Try({
    val name = row.getString(row.fieldIndex("name"))
    val metadataRow = row.getStruct(row.fieldIndex("meta"))
    val score: Double = calcScore(row)
    val combinedRow: Row = metadataRow ++ ("score" -> score)
    val jsonString: String = convertRowToJson(combinedRow)
    Venue(name = name, json = jsonString)
  })
}

Psidom 的解决方案:

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

仅当 Row 只有一层而不是嵌套 Row 时才有效。这是架构:

StructType(
    StructField(indicator,StringType,true),   
    StructField(range,
    StructType(
        StructField(currency_code,StringType,true),
        StructField(maxrate,LongType,true), 
        StructField(minrate,LongType,true)),true))

也尝试了 Artem 的建议,但没有编译:

def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = {
  val sparkContext = sqlContext.sparkContext
  import sparkContext._
  import sqlContext.implicits._
  import sqlContext._
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataFrame = rowRDD.toDF() //XXX does not compile
  dataFrame
}

【问题讨论】:

    标签: json scala apache-spark json4s


    【解决方案1】:

    您可以使用getValuesMap将行对象转换为Map,然后将其转换为JSON:

    import scala.util.parsing.json.JSONObject
    import org.apache.spark.sql._
    
    val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")    
    val row = df.first()          // this is an example row object
    
    def convertRowToJSON(row: Row): String = {
        val m = row.getValuesMap(row.schema.fieldNames)
        JSONObject(m).toString()
    }
    
    convertRowToJSON(row)
    // res46: String = {"A" : 1, "B" : 2, "C" : 3}
    

    【讨论】:

    • 更正:它实际上只适用于 Map / Struct 的第一级,不适用于嵌套 Map 你只会看到值而不是键。
    • @SamiBadawi 在哪里可以找到嵌套地图的解决方案?
    • 我也遇到了嵌套问题
    【解决方案2】:

    我需要读取 json 输入并生成 json 输出。 大多数字段都是单独处理的,但只需要保留一些 json 子对象。

    当 Spark 读取数据帧时,它会将记录转换为行。 Row 是一个类似 json 的结构。可以将其转换并写入 json。

    但我需要将一些子 json 结构提取到一个字符串中以用作新字段。

    这可以这样完成:

    dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address"))
    

    location.address 是传入的基于 json 的数据帧的子 json 对象的路径。 address_json 是该对象的列名转换为 json 的字符串版本。

    to_json 在 Spark 2.1 中实现。

    如果使用 json4s 生成输出 json,则 address_json 应解析为 AST 表示,否则输出 json 将转义 address_json 部分。

    【讨论】:

      【解决方案3】:

      注意 scala 类 scala.util.parsing.json.JSONObject 已弃用且不支持空值。

      @deprecated("该类将被移除。", "2.11.0")

      “JSONFormat.defaultFormat 不处理空值”

      https://issues.scala-lang.org/browse/SI-5092

      【讨论】:

      • 谢谢阿农。有一些关于 Scala 中现代化 json 支持的讨论。
      【解决方案4】:

      基本上,您可以拥有一个仅包含一行的数据框。因此,您可以尝试过滤初始数据帧,然后将其解析为 json。

      【讨论】:

      • 感谢您的建议。我试过你的方法: def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = { val sparkContext = sqlContext.sparkContext import sparkContext._ import sqlContext.implicits._ import sqlContext._ val rowRDD: RDD[Row] = sqlContext. sparkContext.makeRDD(row :: Nil) val dataFrame = rowRDD.toDF() //XXX 没有编译 dataFrame } 它没有编译。
      【解决方案5】:

      JSon 有 schema 但 Row 没有 schema,因此您需要在 Row 上应用 schema 并转换为 JSON。这是你可以做到的。

      import org.apache.spark.sql.Row
      import org.apache.spark.sql.types._
      
      def convertRowToJson(row: Row): String = {
      
        val schema = StructType(
            StructField("name", StringType, true) ::
            StructField("meta", StringType, false) ::  Nil)
      
            return sqlContext.applySchema(row, schema).toJSON
      }
      

      【讨论】:

        【解决方案6】:

        我有同样的问题,我有规范模式的镶木地板文件(没有数组),我只想获取 json 事件。我做了如下,它似乎工作得很好(Spark 2.1):

        import org.apache.spark.sql.types.StructType
        import org.apache.spark.sql.{DataFrame, Dataset, Row}
        import scala.util.parsing.json.JSONFormat.ValueFormatter
        import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject}
        
        def getValuesMap[T](row: Row, schema: StructType): Map[String,Any] = {
          schema.fields.map {
            field =>
              try{
                if (field.dataType.typeName.equals("struct")){
                  field.name -> getValuesMap(row.getAs[Row](field.name),   field.dataType.asInstanceOf[StructType]) 
                }else{
                  field.name -> row.getAs[T](field.name)
                }
              }catch {case e : Exception =>{field.name -> null.asInstanceOf[T]}}
          }.filter(xy => xy._2 != null).toMap
        }
        
        def convertRowToJSON(row: Row, schema: StructType): JSONObject = {
          val m: Map[String, Any] = getValuesMap(row, schema)
          JSONObject(m)
        }
        //I guess since I am using Any and not nothing the regular ValueFormatter is not working, and I had to add case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
        val defaultFormatter : ValueFormatter = (x : Any) => x match {
          case s : String => "\"" + JSONFormat.quoteString(s) + "\""
          case jo : JSONObject => jo.toString(defaultFormatter)
          case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
          case ja : JSONArray => ja.toString(defaultFormatter)
          case other => other.toString
        }
        
        val someFile = "s3a://bucket/file"
        val df: DataFrame = sqlContext.read.load(someFile)
        val schema: StructType = df.schema
        val jsons: Dataset[JSONObject] = df.map(row => convertRowToJSON(row, schema))
        

        【讨论】:

          【解决方案7】:

          如果你正在迭代一个数据框,你可以直接将数据框转换为一个新的数据框,里面有 json 对象并迭代它

          val df_json = df.toJSON

          【讨论】:

          • 请编辑您的问题,否则更喜欢 cmets。无论如何,请阅读社区的规则。
          • 是否可以设置列名?默认情况下它是“价值”,我想把它改成“身体”
          【解决方案8】:

          我结合了来自 Artem、KiranM 和 Psidom 的建议。做了很多尝试和错误,并提出了我测试嵌套结构的解决方案:

          def row2Json(row: Row, sqlContext: SQLContext): String = {
            import sqlContext.implicits
            val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
            val dataframe = sqlContext.createDataFrame(rowRDD, row.schema)
            dataframe.toJSON.first
          }
          

          此解决方案有效,但仅在以驱动程序模式运行时有效。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2021-03-08
            • 2017-06-13
            • 2016-05-09
            • 2022-12-18
            • 1970-01-01
            • 1970-01-01
            • 2021-04-13
            相关资源
            最近更新 更多