package cn.piesat

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words=lines.flatMap(_.split(" "))
words.foreachRDD(rdd=>{
val spark=SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val worldDataFrame=rdd.map(w=>{
Record(w)
}).toDF()
worldDataFrame.createOrReplaceTempView("words")
val wordCountsDataFram=spark.sql("select word,count(*) as total from words group by word")
wordCountsDataFram.show()
})
ssc.start()
ssc.awaitTermination()
}
}

case class Record(val word:String) {

}

相关文章:

  • 2021-09-06
  • 2021-07-09
  • 2021-11-09
  • 2021-04-25
  • 2021-11-14
  • 2021-11-19
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2022-01-05
  • 2021-10-30
  • 2021-09-09
  • 2021-09-18
  • 2021-10-20
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案