【问题标题】:Using Apache Flink to consume from a Kafka topic then processing the stream with Flink CEP使用 Apache Flink 从 Kafka 主题消费,然后使用 Flink CEP 处理流
【发布时间】:2021-07-06 21:22:00
【问题描述】:

在这个项目中,我尝试使用 Flink 使用来自 Kafka 主题的数据,然后处理流以使用 Flink CEP 检测模式。 使用 Kafka 连接的部分工作并正在获取数据,但 CEP 部分由于某种原因不起作用。 我在这个项目中使用 scala。

build.sbt:


version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.12.2"

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0"

libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.12.2"


libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.12.2" 

主要代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema

import java.util
import java.util.Properties
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.scala._
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.cep.pattern.conditions.IterativeCondition

object flinkExample {
  def main(args: Array[String]): Unit = {


    val CLOSE_THRESHOLD: Double = 140.00

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")

    val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)
    consumer.setStartFromEarliest




    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val src: DataStream[String] = see.addSource(consumer)

    
    val keyedStream: DataStream[Stock] = src.map(v => v)
      .map {
        v =>
          val data = v.split(":")

          val date = data(0)
          val close = data(1).toDouble
          Stock(date,close)
      }

    val pat = Pattern
      .begin[Stock]("start")
      .where(_.Adj_Close > CLOSE_THRESHOLD)


    val patternStream = CEP.pattern(keyedStream, pat)

    val result = patternStream.select(
      patternSelectFunction = new PatternSelectFunction[Stock, String]() {
        override def select(pattern: util.Map[String, util.List[Stock]]): String = {
          val data = pattern.get("first").get(0)

          data.toString
        }
      }
    )

    result.print()

    see.execute("ASK Flink Kafka")

  }

  case class Stock(date: String,
                   Adj_Close: Double)
  {
    override def toString: String = s"Stock date: $date, Adj Close: $Adj_Close"
  }

}

来自 Kafka 的数据是字符串格式:“date:value”

Scala 版本:2.11.12 Flink 版本:1.12.2 卡夫卡版本:2.3.0

我正在使用:sbt 程序集构建项目,然后在 flink 仪表板中部署 jar。

【问题讨论】:

  • “它不起作用”是什么意思??
  • CEP 模式没有检测到任何东西,输出流是空的

标签: scala apache-kafka apache-flink flink-cep


【解决方案1】:

使用pattern.get("first"),您正在从模式序列中选择一个名为“first”的模式,但模式序列只有一个模式,名为“start”。尝试将“first”更改为“start”。

此外,CEP 必须能够按时间顺序对流进行排序,以便进行模式匹配。您应该定义水印策略。对于处理时间语义,您可以使用WatermarkStrategy.noWatermarks()

【讨论】:

  • 在我的情况下如何添加水印层?
  • 您是否有时间戳或序列号字段可用于对事件进行排序?还是您想按照他们到达的顺序与他们合作?
  • 其实我想按照数据到达的顺序来处理。
  • 你使用的是什么版本的 Flink?
  • 在这种情况下,您可以使用 WatermarkStrategy.noWatermarks -- ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/…
猜你喜欢
  • 2016-09-04
  • 1970-01-01
  • 1970-01-01
  • 2017-08-18
  • 2019-02-18
  • 2018-12-31
  • 1970-01-01
  • 1970-01-01
  • 2019-05-28
相关资源
最近更新 更多