Structrued Streaming业务数据实时分析

 

Structrued Streaming业务数据实时分析

 

 Structrued Streaming业务数据实时分析

 

 

先启动spark-shell,记得启动nc服务

Structrued Streaming业务数据实时分析

 

输入以下代码

Structrued Streaming业务数据实时分析

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import spark.implicits._
import spark.implicits._

scala> val lines = spark.readStream.format("socket").option("host", "bigdata-pro01.kfk.com").option("port", 9999).load()
18/03/21 20:55:13 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
lines: org.apache.spark.sql.DataFrame = [value: string]

scala> val words = lines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val wordCounts = words.groupBy("value").count()
wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

scala> val query = wordCounts.writeStream.outputMode("complete").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4e260e04

 

 

 在nc输入几个单词

Structrued Streaming业务数据实时分析

 

 Structrued Streaming业务数据实时分析

 

 我们再输入一些单词

Structrued Streaming业务数据实时分析

 

 Structrued Streaming业务数据实时分析

 

 

我们改一下代码换成update模式

首先重新启动一次spark-shell,记得启动nc

Structrued Streaming业务数据实时分析

 

 Structrued Streaming业务数据实时分析

 

 

 Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

换成append模式

Structrued Streaming业务数据实时分析

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import spark.implicits._
import spark.implicits._

scala> val lines = spark.readStream.format("socket").option("host", "bigdata-pro01.kfk.com").option("port", 9999).load()
18/03/21 21:32:30 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
lines: org.apache.spark.sql.DataFrame = [value: string]

scala> val words = lines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val query = words.writeStream.outputMode("append").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@19d85bbe

 

 

Structrued Streaming业务数据实时分析

 

 Structrued Streaming业务数据实时分析

 

 

 Structrued Streaming业务数据实时分析

 

Structrued Streaming业务数据实时分析

因为我们之前的kafka的版本低了,我下载一个0.10.0版本的

下载地址 http://kafka.apache.org/downloads

Structrued Streaming业务数据实时分析

 我们把kafka0.9版本的配置文件直接复制过来

为了快一点我直接在虚拟机里操作了

复制这几个配置文件

Structrued Streaming业务数据实时分析

把kafka0.10的覆盖掉

Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

 修改一下配置文件

Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

 把kafka分发都另外的两个节点去

 Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

 

在节点2和节点3也把相应的配置文件修改一下

server.properties

Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

 

Structrued Streaming业务数据实时分析

 

 

Structrued Streaming业务数据实时分析

 

在idea里重新建一个scala类

 Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

加上如下代码

Structrued Streaming业务数据实时分析

 

package com.spark.test

import org.apache.spark
import org.apache.spark.sql.SparkSession

object StructuredStreamingKafka {
  def main(args: Array[String]): Unit = {
     val spark=SparkSession.builder().master("local[2]").appName("streaming").getOrCreate()
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
      .option("subscribe", "weblogs")
      .load()
    import spark.implicits._
   val lines= df.selectExpr("CAST(value AS STRING)").as[String]

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

 

跑一下我们的程序

Structrued Streaming业务数据实时分析

 如果报错了提示需要0.10版本的可以先不用管

我们启动一下kafka

Structrued Streaming业务数据实时分析

 

Structrued Streaming业务数据实时分析

 

 

 可以看到程序已经在跑了

Structrued Streaming业务数据实时分析

 

 

我们在kafak里创建一个生产者

Structrued Streaming业务数据实时分析

bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com:9092 --topic weblogs

 

 

 我们输入几个单词

 Structrued Streaming业务数据实时分析

 

可以看到idea这边的结果

Structrued Streaming业务数据实时分析

 

我们可以换成update模式

Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

 程序跑起来了

Structrued Streaming业务数据实时分析

 

输入单词

Structrued Streaming业务数据实时分析

 

 这个是运行的结果

Structrued Streaming业务数据实时分析

 

 

Structrued Streaming业务数据实时分析

 

 我们把包上传上来(3个节点都这样做)

Structrued Streaming业务数据实时分析

 

 

启动spark-shell

Structrued Streaming业务数据实时分析

 

把代码拷贝进来

Structrued Streaming业务数据实时分析

 

 val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
      .option("subscribe", "weblogs")
      .load()
    import spark.implicits._
   val lines= df.selectExpr("CAST(value AS STRING)").as[String]

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()

    val query = wordCounts.writeStream
      .outputMode("update")
      .format("console")
      .start()

    query.awaitTermination()

 

 这个时候一定要保持kafka和生产者是开启的

 我在生产者这边输入几个单词

Structrued Streaming业务数据实时分析

 

 回到spark-shell界面可以看到统计结果

Structrued Streaming业务数据实时分析

 

 

Structrued Streaming业务数据实时分析

 

 Structrued Streaming业务数据实时分析

 

我们先把mysqld的test数据库的webCount的表的内容清除

Structrued Streaming业务数据实时分析

 

打开idea,我们编写两个程序

Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

package com.spark.test

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime

/**
  * Created by Administrator on 2017/10/16.
  */
object StructuredStreamingKafka {

  case class Weblog(datatime:String,
                    userid:String,
                    searchname:String,
                    retorder:String,
                    cliorder:String,
                    cliurl:String)

  def main(args: Array[String]): Unit = {

    val spark  = SparkSession.builder()
      .master("local[2]")
      .appName("streaming").getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
      .option("subscribe", "weblogs")
      .load()

    import spark.implicits._
    val lines = df.selectExpr("CAST(value AS STRING)").as[String]
    val weblog = lines.map(_.split(","))
      .map(x => Weblog(x(0), x(1), x(2),x(3),x(4),x(5)))
    val titleCount = weblog
      .groupBy("searchname").count().toDF("titleName","count")

    val url ="jdbc:mysql://bigdata-pro01.kfk.com:3306/test"
    val username="root"
    val password="root"

    val writer = new JDBCSink(url,username,password)
    val query = titleCount.writeStream
      .foreach(writer)
      .outputMode("update")
        //.format("console")
      .trigger(ProcessingTime("5 seconds"))
      .start()
    query.awaitTermination()
  }

}

 

 

package com.spark.test

import java.sql._
import java.sql.{Connection, DriverManager}
import org.apache.spark.sql.{ForeachWriter, Row}

/**
  * Created by Administrator on 2017/10/17.
  */
class JDBCSink(url:String, username:String,password:String) extends ForeachWriter[Row]{

  var statement : Statement =_
  var resultSet : ResultSet =_
  var connection : Connection=_
  override def open(partitionId: Long, version: Long): Boolean = {
    Class.forName("com.mysql.jdbc.Driver")
    //  connection = new MySqlPool(url,username,password).getJdbcConn();
    connection = DriverManager.getConnection(url,username,password);
      statement = connection.createStatement()
      return true
  }

  override def process(value: Row): Unit = {
    val titleName = value.getAs[String]("titleName").replaceAll("[\\[\\]]","")
    val count = value.getAs[Long]("count");

    val querySql = "select 1 from webCount " +
      "where titleName = '"+titleName+"'"

    val updateSql = "update webCount set " +
      "count = "+count+" where titleName = '"+titleName+"'"

    val insertSql = "insert into webCount(titleName,count)" +
      "values('"+titleName+"',"+count+")"

    try{


      var resultSet = statement.executeQuery(querySql)
      if(resultSet.next()){
        statement.executeUpdate(updateSql)
      }else{
        statement.execute(insertSql)
      }
    }catch {
      case ex: SQLException => {
        println("SQLException")
      }
      case ex: Exception => {
        println("Exception")
      }
      case ex: RuntimeException => {
        println("RuntimeException")
      }
      case ex: Throwable => {
        println("Throwable")
      }
    }

  }

  override def close(errorOrNull: Throwable): Unit = {
//    if(resultSet.wasNull()){
//      resultSet.close()
//    }
    if(statement==null){
      statement.close()
    }
    if(connection==null){
      connection.close()
    }
  }

}

 

在pom.xml文件里添加这个依赖包

Structrued Streaming业务数据实时分析

 

<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
    </dependency>

 

 

我在这里说一下这个依赖包版本的选择上最好要跟你集群里面的依赖包版本一样,不然可能会报错的,可以参考hive里的Lib路径下的版本

 

 

 

 

 保持集群的dfs,hbase,yarn,zookeeper,都是启动的状态

Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

 

Structrued Streaming业务数据实时分析

 

 启动我们节点1和节点2的flume,在启动之前我们先修改一下flume的配置,因为我们把jdk版本和kafka版本后面更换了,所以我们要修改配置文件(3个节点的都改)

 Structrued Streaming业务数据实时分析

 

 启动节点1的flume

Structrued Streaming业务数据实时分析

 

 启动节点1的kafka

bin/kafka-server-start.sh config/server.properties

 Structrued Streaming业务数据实时分析

 

启动节点2的flume

Structrued Streaming业务数据实时分析

 

在节点2上把数据启动起来,实时产生数据

Structrued Streaming业务数据实时分析

 

 回到idea我们把程序运行一下

 Structrued Streaming业务数据实时分析

 

注意了,现在程序是没有报错的,因为我前期工作做得不是太好,给idea分配的内存小了,所以跑得很慢

 Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

 

回到mysql里面查看webCount表,已经有数据进来了

Structrued Streaming业务数据实时分析

 

 

Structrued Streaming业务数据实时分析

 

 

 Structrued Streaming业务数据实时分析

Structrued Streaming业务数据实时分析

 

 

我们把配置文件修改如下

Structrued Streaming业务数据实时分析

 

 Structrued Streaming业务数据实时分析

[client]
socket=/var/lib/mysql/mysql.sock
default-character-set=utf8

[mysqld]
character-set-server=utf8
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

[mysql]
default-character-set=utf8

[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

 

 

 把表删除了

Structrued Streaming业务数据实时分析

 

 

 重新创建表

Structrued Streaming业务数据实时分析

create table webCount( titleName varchar(255) CHARACTER SET utf8 DEFAULT NULL, count int(11) DEFAULT NULL )ENGINE=lnnoDB DEFAULT CHARSET=utf8;

 

 

重新在运行一次程序

Structrued Streaming业务数据实时分析

 

 Structrued Streaming业务数据实时分析

可以看到没有中文乱码了。

 

同时我们通过可视化工具连接mysql查看

 Structrued Streaming业务数据实时分析

 

相关文章:

  • 2021-08-21
  • 2021-12-05
  • 2021-12-05
  • 2021-08-09
  • 2022-12-23
  • 2021-11-24
猜你喜欢
  • 2021-12-19
  • 2021-07-07
  • 2021-11-23
  • 2021-06-22
  • 2021-09-20
  • 2021-11-19
相关资源
相似解决方案