【问题标题】:Spark How to RDD[JSONObject] to DatasetSpark 如何将 RDD[JSONObject] 转换为数据集
【发布时间】:2017-07-08 08:27:01
【问题描述】:

我正在从 com.google.gson.JsonObject 类型的 Element 的 RDD 中读取数据。试图将其转换为 DataSet 但不知道如何执行此操作。

import com.google.gson.{JsonParser}
import org.apache.hadoop.io.LongWritable
import org.apache.spark.sql.{SparkSession}

object tmp {
  class people(name: String, age: Long, phone: String)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val sc = spark.sparkContext

    val parser = new JsonParser();
    val jsonObject1 = parser.parse("""{"name":"abc","age":23,"phone":"0208"}""").getAsJsonObject()
    val jsonObject2 = parser.parse("""{"name":"xyz","age":33}""").getAsJsonObject()

    val PairRDD = sc.parallelize(List(
      (new LongWritable(1l), jsonObject1),
      (new LongWritable(2l), jsonObject2)
    ))

    val rdd1 =PairRDD.map(element => element._2)

    import spark.implicits._

    //How to create Dataset as schema People from rdd1?
  }
}

即使尝试打印 rdd1 元素也会抛出

object not serializable (class: org.apache.hadoop.io.LongWritable, value: 1)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (1,{"name":"abc","age":23,"phone":"0208"}))

基本上,我从BigQuery 表中获得了这个 RDD[LongWritable,JsonParser],我想将它转换为 Dataset,以便我可以应用 SQL 进行转换。

我故意将 phone 留在第二条记录中,BigQuery 不为该元素返回 null 值。

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    感谢您的澄清。您需要在 kryo 中将类注册为 Serializable。以下展示作品。我在 spark-shell 中运行,因此必须销毁旧上下文并使用包含已注册 Kryo 类的配置创建一个新的 spark 上下文

    import com.google.gson.{JsonParser}
    import org.apache.hadoop.io.LongWritable
    import org.apache.spark.SparkContext
    
    sc.stop()
    
    val conf = sc.getConf
    conf.registerKryoClasses( Array(classOf[LongWritable], classOf[JsonParser] ))
    conf.get("spark.kryo.classesToRegister")
    
    val sc = new SparkContext(conf)
    
    val parser = new JsonParser();
    val jsonObject1 = parser.parse("""{"name":"abc","age":23,"phone":"0208"}""").getAsJsonObject()
    val jsonObject2 = parser.parse("""{"name":"xyz","age":33}""").getAsJsonObject()
    
    val pairRDD = sc.parallelize(List(
      (new LongWritable(1l), jsonObject1),
      (new LongWritable(2l), jsonObject2)
    ))
    
    
    val rdd = pairRDD.map(element => element._2)
    
    rdd.collect()
    // res9: Array[com.google.gson.JsonObject] = Array({"name":"abc","age":23,"phone":"0208"}, {"name":"xyz","age":33})
    
    val jsonstrs = rdd.map(e=>e.toString).collect()
    val df = spark.read.json( sc.parallelize(jsonstrs) )    
    df.printSchema
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)
    //  |-- phone: string (nullable = true)
    

    【讨论】:

    • 感谢 Shoaib,如果能提供更多想法,我已经编辑了我的问题。
    猜你喜欢
    • 2018-06-14
    • 2016-12-12
    • 1970-01-01
    • 2015-02-27
    • 2019-12-20
    • 2017-12-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多