【问题标题】:Convert RDD[Map[String, String]] to Spark dataframe将 RDD[Map[String, String]] 转换为 Spark 数据帧
【发布时间】:2017-08-24 10:51:39
【问题描述】:

我正在尝试将 val rec: RDD[Map[String, String]] 转换为 Spark 数据帧。

但是当我执行时:

val sqlContext = new SQLContext(sc)
val df = sqlContext.createDataFrame(rec, classOf[Map[String, String]])

df.write.json("/tmp/file.json") 

文件 json 中充满了空对象:

{}
{}
{}
{}
{}

我将它转换为 json 只是因为我想保存 rec val 并稍后在 python 中使用 SQLContext 对象重用它。

那么问题是如何保存我在 Scala 中创建的 RDD[HashMap[String, String]] 并在以后在 Python 中重用?

更新

rec val 包含

Map(Param_timestamp -> 2017-03-28T02:00:02.887, Param_querytype -> listing, Param_slug -> /salute-beauty-fitness/bellezza-cura-del-corpo/cosmesi/makeup, Param_br -> CAUDALIE)

df.show() 返回:

++
||
++
||
... all the 20 lines are the alike "||"
||
++
only showing top 20 rows

【问题讨论】:

  • rec 的内容是什么? df.show() 的输出是什么?
  • @L.CWI 刚刚更新了我的问题
  • 在你的 createDataframe 中使用StructType( StructField("timeStamp", StringType, false) :: StructField("querytype", StringType, false) :: StructField("slug", StringType, false):: StructField("br", StringType, false) :: Nil) 怎么样? (我对 scala 了解不多,但是在创建数据框时您需要以某种方式指定架构,并且由于它是空的,我认为这里使用了错误的架构)
  • 但是我有一个hashmap,hashmap里面的key应该是字段名,value都是String吧?
  • 我有动态数量的字段会在运行时生成,所以应该动态确定 StructType。

标签: python json scala apache-spark rdd


【解决方案1】:

只要你掌握了你的模式,你就可以通过使用 StructField 和 StructType 重新创建它,我相信doc 解释得很好。至于 scala,我对它不是很熟悉,但是 Java 中的一个小例子可能会有所帮助(稍后我有更多时间会将其转换为 Scala):

    JavaSparkContext jsc = new JavaSparkContext(
            new SparkConf().setAppName("test").setMaster("local[*]"));
    jsc.setLogLevel("ERROR");
    System.setProperty("hadoop.home.dir", "C:\\Temp\\tt\\Tools");

    List<Tuple2<String, String>> test = new ArrayList<Tuple2<String, String>>();
    Tuple2<String, String> tt = new Tuple2<String, String>("key", "val1");
    test.add(tt);
    tt = new Tuple2<String, String>("key", "val2");
    test.add(tt);
    tt = new Tuple2<String, String>("key2", "val");
    test.add(tt);

    JavaPairRDD<String, String> testRDD = jsc.parallelizePairs(test);

    System.out.println(testRDD.first());

    SparkContext sc = JavaSparkContext.toSparkContext(jsc);
    SparkSession ss = new SparkSession(sc);
    StructField[] fields = {
            DataTypes.createStructField("key", DataTypes.StringType, false),
            DataTypes.createStructField("val", DataTypes.StringType, false) };
    StructType schema = DataTypes.createStructType(fields);
    JavaRDD<Row> testRowRDD = testRDD.map(line -> RowFactory.create(line._1, line._2));
    Dataset<Row> myDF = ss.createDataFrame(testRowRDD, schema);
    myDF.show();

    myDF.write().json("test.json");

    jsc.close();

输出是几个Json文件,每一行包含如下:

{"key":"key2","val":"val"}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-09-28
    • 2021-08-09
    • 1970-01-01
    • 2020-03-09
    • 2015-12-11
    • 1970-01-01
    • 2017-01-29
    相关资源
    最近更新 更多