【问题标题】:How do I groupby and aggregate column in Spark and create nested Json如何在 Spark 中分组和聚合列并创建嵌套的 Json
【发布时间】:2017-08-13 14:30:37
【问题描述】:

我有这样的数据,我想创建以下 JSON 文档。 如何在 Spark 中实现它?在 Spark 中最有效的方法是什么?

 name|contact           |type
    jack|123-123-1234       |phone
    jack|jack.reach@xyz.com |email
    jack|123 main street    |address
    jack|34545544445        |mobile

       {
         "name" : "jack",
         "contacts":[
         {
           "contact" : "123-123-1234",
           "type" : "phone"
         },
         {
           "contact" : "jack.reach@xyz.com",
           "type" : "email"
         },
        {
           "contact" : "123 main street",
            "type" : "address"
        },
        {
           "contact" : "34545544445",
           "type" : "mobile"
        }
      ]
    }

这只是我提供的一个示例用例。我有大数据集在哪里 我必须将多列行折叠成一行并进行一些分组 逻辑。

我当前的方法是编写一个读取每一行的 UDAF,存储在 缓冲并合并它。所以代码是

val mergeUDAF = new ColumnUDAF

val tempTable = inputTable.withColumn("contacts",struct($"contact",$"type")
val outputTable = tempTable.groupby($"name").agg(mergeUDAF($"contacts").alias("contacts"))

我正在尝试找出其他方法。我是 尝试使用 Spark-SQL 实现这一目标。

【问题讨论】:

  • 请为该问题添加更多信息。展示你的尝试。
  • @Thiago Baldim 抱歉,更详细地更新了问题。谢谢。

标签: apache-spark group-by apache-spark-sql aggregation


【解决方案1】:

我认为你应该从你的 csv 数据中创建一个 RDD,按“名称”分组而不是映射到 json 字符串:

 val data = sc.parallelize(Seq("jack|123-123-1234|phone", "jack|jack.reach@xyz.com |email", "david|123 main street|address", "david|34545544445|mobile")) // change to load your data as RDD

 val result = data.map(_.split('|')).groupBy(a => a(0)).map(a => {
    val contact = a._2.map(c => s"""{"contact": "${c(1)}", "type": "${c(2)}" }""" ).mkString(",")
    s"""{"name": "${a._1}", "contacts":[ ${contact}] }"""
  }).collect.mkString(",")

  val json = s"""[ ${result} ] """ 

【讨论】:

    【解决方案2】:
    case class contact(contact:String,contactType:String)
    case class Person(name:String,contact:Seq[contact])
        object SparkTestGrouping {
    
          def main(args: Array[String]): Unit = {
    
            val conf = new SparkConf().setAppName("LocalTest").setMaster("local")
            val sc = new SparkContext(conf)
            val sqlContext = new SQLContext(sc)
            import sqlContext.implicits._
    
    
            val inputData=Seq("jack|123-123-1234|phone","jack|jack.reach@xyz.com|email","jack|123 main street|address","jack|34545544445|mobile")
    
    
            val finalData = sc.parallelize(inputData)
    
            val convertData = finalData.map(_.split('|'))
              .map(line => (line(0),Seq(line(1) +"|" +line(2))))
              .reduceByKey((x,y) => x ++: y)
    
              val output = convertData.map(line => (line._1,line._2.map(_.split('|')).map(obj => contact(obj(0),obj(1)))))
    
            val finalOutput = output.map(line => Person(line._1,line._2))
    
            finalOutput.toDF().toJSON.foreach(println)
    
            sc.stop()
    
          }
    
        }
    

    您可以使用关键字段从数据中创建元组并使用 reducebyKey 对数据进行分组。在上面的例子中,我创建了一个元组 (name,Seq("contact|contactType")) 并使用 reducebykey 对 按名称的数据。数据分组后,可以使用case类来 如果您需要进一步加入,请转换为 DataFrame 和 DataSets 或者只需要创建 json 文档。

    【讨论】:

    • 虽然您发布的代码显然适用于 OP,但请发布一些说明以帮助该网站的未来访问者了解您所做的事情。
    猜你喜欢
    • 1970-01-01
    • 2017-05-19
    • 2019-03-15
    • 2021-04-19
    • 2017-12-18
    • 1970-01-01
    • 1970-01-01
    • 2022-12-07
    • 1970-01-01
    相关资源
    最近更新 更多