1. Environment
getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
//批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//流处理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。
parallelism.default: 1
Flink会自动判断是本地环境还是远程的集群环境。
2. Source
① 从集合读取数据
② 从文件中读取数据
③ 从kafka中读取数据
引入kafka连接器的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
④ 自定义source,传入SourceFunction
代码如下:
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import scala.collection.immutable import scala.util.Random object SourceTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 1. 从集合中读取数据 温度传感器 id 时间戳 温度 /* val stream01 = env.fromCollection(List( SensorReading("sensor_1", 1547718199, 35.80018327300259), SensorReading("sensor_6", 1547718201, 15.402984393403084), SensorReading("sensor_7", 1547718202, 6.720945201171228), SensorReading("sensor_10", 1547718205, 38.101067604893444) )) stream01.print("stream01")*/ //env.fromElements("flink", 1, 32, 3213, 0.134).print("test_java") //从不同元素中读取 // 2. 从文件中读取数据 /* val dsTream02: DataStream[String] = env.readTextFile("F:\\BI\\code\\sensor.txt") dsTream02.print("stream02")*/ // 3. 从kafka中读取数据 // 创建kafka相关的配置 val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop101:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val stream03: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties)) // 4. 自定义数据源 /*val stream04: DataStream[SensorReading] = env.addSource(new SensorSource()) stream04.print("stream04")*/ env.execute("SouceTest Start") } } // 定义数据样例类 温度传感器 case class SensorReading(id: String, timestamp: Long, temperature: Double) class SensorSource() extends SourceFunction[SensorReading]{ //定义一个flag:表示数据源是否还在正常运行 var running: Boolean = true //取消数据生成 override def cancel(): Unit = running = false //正常生成数据 override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { // 创建一个随机数发生器 val rand: Random = new Random() // 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据 var curTemp: immutable.IndexedSeq[(String, Double)] = 1.to(10).map( i => ("sensor_" + i, 60 + rand.nextGaussian() * 20) //nextGaussian标准高斯随机分布--正态曲线 ) // 无限循环生成流数据,除非被cancel while (running){ //更新值的温度 在前一次温度的基础上更新温度值 curTemp = curTemp.map( t => (t._1, t._2 + rand.nextGaussian()) ) // 获取当前的时间戳 val curTime: Long = System.currentTimeMillis() // 包装成SensorReading,输出 curTemp.foreach( t => sourceContext.collect(SensorReading(t._1, curTime, t._2)) ) // 间隔100ms Thread.sleep(100) } } }