【问题标题】:How to parse a JSON with unknown key-value pairs in a Spark DataFrame to multiple rows of values如何将 Spark DataFrame 中具有未知键值对的 JSON 解析为多行值
【发布时间】:2020-08-04 19:28:40
【问题描述】:

输入数据帧来自键值对中的 Kafka -

输入:

{
"CBT-POSTED-TXN": {
    "eventTyp": "TXN-NEW",
    "eventCatgry": "CBT-POSTED-TXN"
  },
  "CBT-BALANCE-CHG": {
    "eventTyp": "TXN-NEW",
    "eventCatgry": "CBT-BALANCE-CHG",
    "enablerEventVer": "1.0.0"
  }
} 

输出的DataFrame需要是-

第 1 行

{
    "eventTyp": "TXN-NEW",
    "eventCatgry": "CBT-POSTED-TXN"
}

第 2 行

{
    "eventTyp": "TXN-NEW",
    "eventCatgry": "CBT-BALANCE-CHG",
    "enablerEventVer": "1.0.0"
 }

以下是我尝试解析的方式-

override def translateSource(df: DataFrame, spark: SparkSession = SparkUtil.getSparkSession("")): DataFrame = {
    import spark.implicits._
    val k3ProcessingSchema: StructType = Encoders.product[k3SourceSchema].schema

    val dfTranslated=df.selectExpr("CAST(key AS STRING) key", "cast(value as string) value", "CAST(partition as String)", "CAST(offset as String)", "CAST(timestamp as String)").as[(String, String, String, String, String)]
      .select(from_json($"value", k3ProcessingSchema).as("k3Clubbed"), $"value", $"key", $"partition", $"offset", $"timestamp")
      .select($"k3Clubbed", $"value", $"key", $"partition", $"offset", $"timestamp").filter("k3Clubbed is not null")
      .as[(k3SourceSchema, String, String, String, String, String)]

    dfTranslated.collect()




    df
  }

case class k3SourceSchema (
                          k3SourceValue: Map[String,String]
                          )

但是代码无法将包含 JSON 格式的 df(value) 列与多个事件解析为字符串(键)和字符串(值)的映射

【问题讨论】:

    标签: scala dataframe


    【解决方案1】:

    您正在谈论操作数据框的行并输出多行。我认为最好的方法是在数据框上使用explode函数,但您希望以可以使用explode函数的格式丰富您的数据。

    请参考以下代码以更好地理解 -

    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions.explode
    import scala.collection.JavaConverters._
    import java.util.HashMap
    import scala.collection.mutable.ArrayBuffer
    import org.codehaus.jackson.map.ObjectMapper
    import com.lbg.pas.alerts.realtime.notifications.common.entity.K3FLDEntity
    
    def translateSource(df: DataFrame, spark: SparkSession = SparkUtil.getSparkSession("")): DataFrame = {
        import spark.implicits._
    
        val dataSetSource = df.selectExpr("CAST(key AS STRING) key", "cast(value as string) value", "CAST(partition as String)", "CAST(offset as String)", "CAST(timestamp as String)")
          .select( $"value", $"key", $"partition", $"offset", $"timestamp").as[(String, String, String, String, String)]
    
    
          val dataSetTranslated = dataSetSource.map(row=>{
            val jsonInput= row._1
            val key = row._2
            val partition = row._3
            val offset = row._4
            val timestamp = row._5
    
            //Converting Original JSON to ArrayBuffer of JSON(s)
            val mapperObj = new ObjectMapper()
            val jsonMap = mapperObj.readValue(jsonInput,classOf[HashMap[String,HashMap[String,String]]])
            val JsonList = new ArrayBuffer[String]()
    
            for ((k,v) <- jsonMap.asScala){
              val jsonResp = mapperObj.writeValueAsString(v)
              JsonList+=jsonResp
            }
            K3FLDEntity(JsonList,key,partition,offset,timestamp)
          }).as[(ArrayBuffer[String],String,String,String,String)]
    
        val dataFrameTranslated=dataSetTranslated.withColumn("value",explode($"JsonList"))
          .drop("JsonList")
          .toDF()
    
        dataFrameTranslated
      }
    

    我正在使用下面的案例类 -

    import scala.collection.mutable.ArrayBuffer
    
    case class K3FLDEntity(
                            JsonList : ArrayBuffer[String],
                            key: String,
                            partition: String,
                            offset: String,
                            timestamp:String
                          )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多