【问题标题】:scala spark convert a struct type column to json datascala spark将结构类型列转换为json数据
【发布时间】:2018-06-30 09:54:51
【问题描述】:

我试图聚合数据集中的几个字段并将它们转换为 json 数组格式,我使用 concat_ws 和 lit 函数手动添加“:”分隔符,我相信应该有更好的方法来做到这一点,这里是到目前为止我尝试的代码,我使用的是 spark 2.0.1 版本,所以 to_json 函数没有运气。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.concat_ws
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.functions.udf

object Zipper {
  val warehouseLocation = "file:///${system:user.dir}//spark-warehouse"
  val spark = SparkSession
    .builder()
    .appName("jsonconvert")
    .config("spark.master", "local")
    .config("spark.sql.warehouse.dir", warehouseLocation)
    .getOrCreate()
  import spark.implicits._
def main(args: Array[String]) = {

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")
df.show(false)



df.groupBy($"name")
  .agg(collect_list(struct(concat_ws(":",lit("food"),$"food"),concat_ws(":",lit("price"),$"price"))).as("foods"))
  .show(false)
}
} 



    +----+------------------------------------------------------------------------------+
|name|foods                                                                         |
+----+------------------------------------------------------------------------------+
|john|[[food:tomato,price:1.99], [food:carrot,price:0.45], [food:banana,price:1.29]]|
|bill|[[food:apple,price:0.99], [food:taco,price:2.59]]                             |
+----+------------------------------------------------------------------------------+

预期输出

    +----+------------------------------------------------------------------------------+
|name|foods                                                                         |
+----+------------------------------------------------------------------------------+
|john|[{"food":"tomato","price":1.99}, {"food":"carrot","price":0.45}, {"food":"banana","price":1.29}]|
|bill|[{"food":"apple","price":0.99}, {"food":"taco","price":2.59}]                             |
+----+---------------------------------------------------------------------------

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    对于2.1之前的Spark版本,尝试通过name聚合(foodprice),将toJSON应用于DataFrame,并提取JSON对象如下:

    import org.apache.spark.sql.functions._
    
    df.groupBy($"name").agg(collect_list(struct($"food", $"price")).as("food_price")).
      toJSON.
      select(
        get_json_object($"value", "$.name").as("name"),
        get_json_object($"value", "$.food_price").as("foods")
      ).
      show(false)
    // +----+----------------------------------------------------------------------------------------------+
    // |name|foods                                                                                         |
    // +----+----------------------------------------------------------------------------------------------+
    // |john|[{"food":"tomato","price":1.99},{"food":"carrot","price":0.45},{"food":"banana","price":1.29}]|
    // |bill|[{"food":"apple","price":0.99},{"food":"taco","price":2.59}]                                  |
    // +----+----------------------------------------------------------------------------------------------+
    

    【讨论】:

    • 正如问题中提到的,我使用的是较旧的 spark 版本,它没有现成的 to_json 函数。
    • 错过了,我的错。请查看修改后的解决方案是否适合您。
    【解决方案2】:
        import org.apache.spark.sql._
        import org.apache.spark.sql.functions._
    
    
    
    
        val df = Seq(
          ("john", "tomato", 1.99),
          ("john", "carrot", 0.45),
          ("bill", "apple", 0.99),
          ("john", "banana", 1.29),
          ("bill", "taco", 2.59)
        ).toDF("name", "food", "price")
    
    
    
        val vkDF2 = df.groupBy("name").agg(collect_list(struct(col("food"),col("price"))).alias("vaquarkhan_json"))
    
        vkDF2.show()
    
    **Results :**
    
    +----+--------------------+
    |name|     vaquarkhan_json|
    +----+--------------------+
    |john|[[tomato,1.99], [...|
    |bill|[[apple,0.99], [t...|
    +----+--------------------+
    
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    df: org.apache.spark.sql.DataFrame = [name: string, food: string ... 1 more field]
    vkDF2: org.apache.spark.sql.DataFrame = [name: string, vaquarkhan_json: array<struct<food:string,price:double>>]
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-01-21
      • 2018-03-15
      • 2017-11-03
      • 2021-12-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多