/** * Created by 清风笑丶 Cotter on 2019/6/6. */ objectStateFulWordCountextendsApp{ val sparkConf = newSparkConf().setAppName("Stateful WordCount").setMaster("local[*]") val ssc = newStreamingContext(sparkConf, Seconds(5)) ssc.sparkContext.setCheckpointDir("./checkpoint") val line = ssc.socketTextStream("datanode1", 9999) val words = line.flatMap(_.split(" ")) val word2Count = words.map((_, 1))
val state = word2Count.updateStateByKey[Int] { (values: Seq[Int], state: Option[Int]) => state match { caseNone => Some(values.sum) caseSome(pre) => Some(values.sum + pre) } } state.print() ssc.start() ssc.awaitTermination() }
/** * Created by 清风笑丶 Cotter on 2019/6/6. */ objectPairDstreamFunctionsextendsApp{ val sparkConf = newSparkConf().setAppName("stateful").setMaster("local[*]") val ssc = newStreamingContext(sparkConf, Seconds(5)) ssc.sparkContext.setCheckpointDir("./checkpoint")
val lines = ssc.socketTextStream("datanode1", 9999) val words = lines.flatMap(_.split(" ")) val word2Count = words.map((_, 1)) val state = word2Count.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(15), Seconds(5)) state.print()
/** * Created by 清风笑丶 Cotter on 2019/6/6. */ objectDStream2MysqlextendsApp{ val sparkConf = newSparkConf().setAppName("stateful").setMaster("local[*]") val ssc = newStreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("datanode1", 9999) val words = lines.flatMap(_.split(" ")) val word2Count = words.map((_, 1)).reduceByKey(_ + _) word2Count.print() word2Count.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => { val sql = "insert into streaming_wordCount(item,count) values('" + record._1 + "'," + record._2 + ")" val stmt = connection.createStatement(); stmt.executeUpdate(sql); }) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } } ssc.start() ssc.awaitTermination() }
/** * Created by 清风笑丶 Cotter on 2019/6/7. */ objectAccumulators{ defmain(args: Array[String]): Unit = { val sparkConf = newSparkConf().setAppName("Streaming_AccumulatorAndBroadcast").setMaster("local[*]") val ssc = newStreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("datanode1", 9999) val words = lines.flatMap(_.split(",")) val wordCounts = words.map((_, 1)).reduceByKey(_ + _) //转换为二元组进行累加操作
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter.add(count) false } else { true } }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts println(output) }
/** * Created by 清风笑丶 Cotter on 2019/6/7. */ objectStreamingSQL{ defmain(args: Array[String]): Unit = { val conf = newSparkConf().setAppName("SparkStreaming SQL").setMaster("local[*]") val ssc = newStreamingContext(conf, Seconds(5))
val linesDStream = ssc.socketTextStream("datanode1", 9999)
linesDStream.foreachRDD { rdd =>
// Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._
// Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word")
// Create a temporary view wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() } ssc.start() ssc.awaitTermination()