知识点

Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。

1、依赖:Table API 和 SQL 需要引入的依赖

 <!-- old planner flink table-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_2.12</artifactId>
      <version>1.10.1</version>
    </dependency>
    <!--new planner-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.10.1</version>
    </dependency>

2、代码案例

package table

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
/**
 * @author yangwj
 * @date 2021/1/12 21:17
 * @version 1.0
 */
object TableExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //1、基于流创建表
    val table: Table = tableEnv.fromDataStream(dataStream)

    //2、调用table api进行转换
    val result: Table = table.select("id,temperature").filter("id == 'sensor_1'")
    result.toAppendStream[(String, Double)].print("result")

    //2、sql实现
    tableEnv.createTemporaryView("tabel",table)
    val sql = "select id, temperature from tabel where id = 'sensor_1'"

    val sqlResult: Table = tableEnv.sqlQuery(sql)
    sqlResult.toAppendStream[(String, Double)].print("sqlResult")
    env.execute("table api")
  }
}

 

相关文章:

  • 2021-12-03
  • 2021-12-03
  • 2021-09-23
  • 2021-08-23
  • 2020-02-20
  • 2020-06-20
  • 2021-12-29
猜你喜欢
  • 2021-12-29
  • 2021-08-08
  • 2022-12-23
  • 2022-01-18
  • 2020-11-13
  • 2022-12-23
  • 2020-11-08
相关资源
相似解决方案