【问题标题】:Hbase Spark RDD JSON ColumnHbase Spark RDD JSON 列
【发布时间】:2016-07-12 15:38:53
【问题描述】:

我正在使用 nerdammer hbase spark 连接器并读取两个 hbase 表,因为 RDD 将它们转换为数据帧并运行 SQL 以使其按预期工作。

其中一个表中的一列具有 JSON 对象,我需要在最终结果中提取特定的 JSON 属性值,这怎么可能。 如果我在 ARDD 的 D 列中有 Json 数据,例如 [{"foo":"bar","baz":"qux"}] 我需要创建新的 RDD 或 DF,其值仅在此列中为 "baz"所以最后当我加入时,我只得到这个属性的值。

 val ARDD = sc.hbaseTable[(Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](ATableName)
        .select("A","B","C","D","E").inColumnFamily("pCF")

        val BRDD = sc.hbaseTable[(Option[String],Option[String], Option[String], Option[String], Option[String], Option[String],Option[String])](BTableName)
        .select("A","B","C","D","E","F").inColumnFamily("tCF")


    val ADF = sqlContext.createDataFrame(ARDD).registerTempTable("aDF")
    val BDF = sqlContext.createDataFrame(BRDD).registerTempTable("bDF")

val resultset = sqlContext.sql("SELECT aDF._1, bDF._2, bDF._3, bDF._4, bDF._5, bDF._6, bDF._3, aDF._1, aDF._2, bDF._1 FROM aDF, bDFWHERE aDF._5 = bDF._7").collect()

val joinedResult = resultset.foreach(println)
  println("Count " + joinedResult)

【问题讨论】:

    标签: apache-spark hbase


    【解决方案1】:

    创建了一个 UDF 来实现这一点,并在我的 DF 中创建了包含已解析信息的新列

    import org.json4s.jackson.JsonMethods._
    import org.apache.spark.sql.functions.udf
    def udfScoreToCategory=udf((t: String) => {
       compact((parse(t.toString,true) \ "myField"))})
    
    
    val abc=  myDF.withColumn("_p", udfScoreToCategory(myDF("_4"))).registerTempTable("odDF")
    

    【讨论】:

      猜你喜欢
      • 2015-01-30
      • 1970-01-01
      • 1970-01-01
      • 2017-05-06
      • 2018-04-05
      • 2017-10-27
      • 1970-01-01
      • 1970-01-01
      • 2015-07-14
      相关资源
      最近更新 更多