前置条件:开启hivemetastore服务。能与hive进行交互


[[email protected] data]$ spark-shell --driver-class-path /home/hadoop/app/hive-1.1.0-cdh5.7.0/lib/mysql-connector-java.jar
scala>  import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql.hive.orc._

scala>  import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

scala> val lines=sc.textFile("/data/inputSource.txt")
lines: org.apache.spark.rdd.RDD[String] = /data/inputSource.txt MapPartitionsRDD[1] at textFile at <console>:32

## 进行etl
scala> val data = lines.map(x => {
     |       val str = x.split("\t")
     |       (str(0),str(1),str(3),str(4),str(6),str(10),str(12),str(20))
     |     })
data: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String, String)] = MapPartitionsRDD[2] at map at <console>:33

##创建schema
scala> val schema = StructType(List(
     |   StructField("cdn",StringType,true),
     |   StructField("region",StringType,true),
     |   StructField("level",StringType,true),
     |   StructField("time",StringType,true),
     |   StructField("ip",StringType,true),
     |   StructField("domain",StringType,true),
     |   StructField("url",StringType,true),
     |   StructField("traffic",IntegerType,true)
     | ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(cdn,StringType,true), StructField(region,StringType,true), StructField(level,StringType,true), StructField(time,StringType,true), StructField(ip,StringType,true), StructField(domain,StringType,true), StructField(url,StringType,true), StructField(traffic,IntegerType,true))

scala> spark.sql("use g6hive")

##下面创建Row对象,每个Row对象都是rowRDD中的一行
scala> val rowRDD = data.map(x=>Row(x._1,x._2,x._3,x._4,x._5,x._6,x._7,x._8.trim.toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:33

##建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
scala>  val df =  spark.createDataFrame(rowRDD,schema)
df: org.apache.spark.sql.DataFrame = [cdn: string, region: string ... 6 more fields]

## 下面注册临时表
scala>  df.createOrReplaceTempView("temptable")

## 查询创建的临时表
注意这里的表不是Hive里面的表,而是一个RDD:
scala> val result = spark.sql("select * from temptable")
result: org.apache.spark.sql.DataFrame = [cdn: string, region: string ... 6 more fields]

## 作为ORC文件格式保存,保存为永久表
scala>  result.write.format("orc").saveAsTable("etl2orc")

##输出为parquet                         
scala> result.write.parquet("output_file_path.parquet")     
##输出为orc
scala> result.write.orc("output_file_path.orc")                
scala> 

利用sparkcore进行ETL并输出为orc,parquet文件

利用sparkcore进行ETL并输出为orc,parquet文件

利用sparkcore进行ETL并输出为orc,parquet文件

相关文章: