前置条件:开启hivemetastore服务。能与hive进行交互
[[email protected] data]$ spark-shell --driver-class-path /home/hadoop/app/hive-1.1.0-cdh5.7.0/lib/mysql-connector-java.jar
scala> import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql.hive.orc._
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
scala> val lines=sc.textFile("/data/inputSource.txt")
lines: org.apache.spark.rdd.RDD[String] = /data/inputSource.txt MapPartitionsRDD[1] at textFile at <console>:32
## 进行etl
scala> val data = lines.map(x => {
| val str = x.split("\t")
| (str(0),str(1),str(3),str(4),str(6),str(10),str(12),str(20))
| })
data: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String, String)] = MapPartitionsRDD[2] at map at <console>:33
##创建schema
scala> val schema = StructType(List(
| StructField("cdn",StringType,true),
| StructField("region",StringType,true),
| StructField("level",StringType,true),
| StructField("time",StringType,true),
| StructField("ip",StringType,true),
| StructField("domain",StringType,true),
| StructField("url",StringType,true),
| StructField("traffic",IntegerType,true)
| ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(cdn,StringType,true), StructField(region,StringType,true), StructField(level,StringType,true), StructField(time,StringType,true), StructField(ip,StringType,true), StructField(domain,StringType,true), StructField(url,StringType,true), StructField(traffic,IntegerType,true))
scala> spark.sql("use g6hive")
##下面创建Row对象,每个Row对象都是rowRDD中的一行
scala> val rowRDD = data.map(x=>Row(x._1,x._2,x._3,x._4,x._5,x._6,x._7,x._8.trim.toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:33
##建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
scala> val df = spark.createDataFrame(rowRDD,schema)
df: org.apache.spark.sql.DataFrame = [cdn: string, region: string ... 6 more fields]
## 下面注册临时表
scala> df.createOrReplaceTempView("temptable")
## 查询创建的临时表
注意这里的表不是Hive里面的表,而是一个RDD:
scala> val result = spark.sql("select * from temptable")
result: org.apache.spark.sql.DataFrame = [cdn: string, region: string ... 6 more fields]
## 作为ORC文件格式保存,保存为永久表
scala> result.write.format("orc").saveAsTable("etl2orc")
##输出为parquet
scala> result.write.parquet("output_file_path.parquet")
##输出为orc
scala> result.write.orc("output_file_path.orc")
scala>