【发布时间】:2019-04-27 17:29:32
【问题描述】:
我正在尝试将数据从 hbase 表获取到 apache spark 环境中,但我无法弄清楚如何格式化它。谁能帮帮我。
case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
type Record = (String, Option[String], Option[String])
val hBaseRDD_iacp = sc.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("test_fam")
scala> hBaseRDD_iacp.map(x => systems(x._1,x._2,x._3)).toDF().show()
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7|0.051,0.052,0.055| 17.326,17.344,17.21|
| k6c| 0.056,NA,0.054|17.277,17.283,17.256|
| ad| NA,23.0| 24.0,23.6|
+--------------+-----------------+--------------------+
但是,我实际上希望它采用以下格式。每个逗号分隔的值都在新行中,每个 NA 都被 null 值替换。 iacp 和 temp 列中的值应为浮点类型。每行可以有不同数量的逗号分隔值。
提前致谢!
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7| 0.051| 17.326|
| ab7| 0.052| 17.344|
| ab7| 0.055| 17.21|
| k6c| 0.056| 17.277|
| k6c| null| 17.283|
| k6c| 0.054| 17.256|
| ad| null| 24.0|
| ad| 23| 26.0|
+--------------+-----------------+--------------------+
【问题讨论】:
标签: scala apache-spark apache-spark-sql hbase