Fink| source| transform| sink

 

1. Environment

getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

//批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //流处理 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。 parallelism.default: 1

Flink会自动判断是本地环境还是远程的集群环境。

2. Source

① 从集合读取数据

② 从文件中读取数据

③ 从kafka中读取数据

  引入kafka连接器的依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>

④ 自定义source,传入SourceFunction

代码如下:

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import scala.collection.immutable
import scala.util.Random
object SourceTest {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1. 从集合中读取数据 温度传感器 id 时间戳 温度
/*    val stream01 = env.fromCollection(List(
      SensorReading("sensor_1", 1547718199, 35.80018327300259),
      SensorReading("sensor_6", 1547718201, 15.402984393403084),
      SensorReading("sensor_7", 1547718202, 6.720945201171228),
      SensorReading("sensor_10", 1547718205, 38.101067604893444)
    ))
    stream01.print("stream01")*/

    //env.fromElements("flink", 1, 32, 3213, 0.134).print("test_java")  //从不同元素中读取

    // 2. 从文件中读取数据
/*    val dsTream02: DataStream[String] = env.readTextFile("F:\\BI\\code\\sensor.txt")
    dsTream02.print("stream02")*/

    // 3. 从kafka中读取数据
    // 创建kafka相关的配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop101:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    val stream03: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

    // 4. 自定义数据源
    /*val stream04: DataStream[SensorReading] = env.addSource(new SensorSource())
    stream04.print("stream04")*/

    env.execute("SouceTest Start")

  }
}
// 定义数据样例类 温度传感器
case class SensorReading(id: String, timestamp: Long, temperature: Double)

class SensorSource() extends SourceFunction[SensorReading]{
  //定义一个flag:表示数据源是否还在正常运行
  var running: Boolean = true
  //取消数据生成
  override def cancel(): Unit = running = false
  //正常生成数据
  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    // 创建一个随机数发生器
    val rand: Random = new Random()
    // 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据
    var curTemp: immutable.IndexedSeq[(String, Double)] = 1.to(10).map(
      i => ("sensor_" + i, 60 + rand.nextGaussian() * 20) //nextGaussian标准高斯随机分布--正态曲线
    )
    // 无限循环生成流数据,除非被cancel
    while (running){
      //更新值的温度 在前一次温度的基础上更新温度值
      curTemp = curTemp.map(
        t => (t._1, t._2 + rand.nextGaussian())
      )
      // 获取当前的时间戳
      val curTime: Long = System.currentTimeMillis()
      // 包装成SensorReading,输出
      curTemp.foreach(
        t => sourceContext.collect(SensorReading(t._1, curTime, t._2))
      )
      // 间隔100ms
      Thread.sleep(100)
    }

  }
}
View Code

相关文章:

  • 2022-02-17
  • 2021-12-06
  • 2021-10-19
  • 2022-12-23
  • 2022-12-23
  • 2021-11-20
  • 2021-12-13
  • 2021-07-23
猜你喜欢
  • 2022-12-23
  • 2021-09-29
  • 2021-06-23
  • 2022-12-23
  • 2022-02-08
  • 2021-04-07
  • 2021-04-25
相关资源
相似解决方案