【问题标题】:Nothing is being printed out from a Flink Patterned StreamFlink Patterned Stream 没有打印任何内容
【发布时间】:2017-07-10 15:09:07
【问题描述】:

我有以下代码:

import java.util.Properties

import com.google.gson._
import com.typesafe.config.ConfigFactory
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object WindowedWordCount {
  val configFactory = ConfigFactory.load()
  def main(args: Array[String]) = {
    val brokers = configFactory.getString("kafka.broker")
    val topicChannel1 = configFactory.getString("kafka.topic1")

    val props = new Properties()
    ...

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))

    val partitionedInput = dataStream.keyBy(jsonString => {
      val jsonParser = new JsonParser()
      val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
      jsonObject.get("account")
    })

    val priceCheck = Pattern.begin[String]("start").where({jsonString =>
      val jsonParser = new JsonParser()
      val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
      jsonObject.get("account").toString == "iOS"})

    val pattern = CEP.pattern(partitionedInput, priceCheck)

    val newStream = pattern.select(x =>
      x.get("start").map({str =>
        str
      })
    )

    newStream.print()

    env.execute()
  }
}

由于某种原因,在newStream.print() 的上述代码中没有任何内容被打印出来。我很肯定 Kafka 中有与我在上面定义的模式相匹配的数据,但由于某种原因没有打印出来。

谁能帮我找出这段代码中的错误?

编辑:

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {

  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    val jsonParser = new JsonParser()
    val context = jsonParser.parse(e).getAsJsonObject.getAsJsonObject("context")
    Instant.parse(context.get("serverTimestamp").toString.replaceAll("\"", "")).toEpochMilli
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis())
  }
}

我在 flink 文档中看到,您可以在 extractTimestamp 方法中返回 prevElementTimestamp(如果您使用的是 Kafka010),在 getCurrentWatermark 方法中返回 new Watermark(System.currentTimeMillis)

但我不明白prevElementTimestamp 是什么,也不明白为什么人们会将new Watermark(System.currentTimeMillis) 作为水印返回而不是其他东西。请您详细说明我们为什么要这样做WaterMarkEventTime如何协同工作?

【问题讨论】:

    标签: scala apache-flink


    【解决方案1】:

    您确实将您的工作设置为在EventTime 中工作,但您没有提供时间戳和水印提取器。

    有关在活动时间工作的更多信息,请参阅docs。如果您想使用 kafka 嵌入式时间戳,docs 可能会帮助您。

    EventTime 中,CEP 库在水印到达时缓冲事件,以便正确处理乱序事件。在您的情况下,没有生成水印,因此事件被无限缓冲。


    编辑:

    1. 对于prevElementTimestamp,我认为文档非常清楚:

      使用来自 Kafka 的时间戳时,无需定义时间戳提取器。 extractTimestamp() 方法的previousElementTimestamp 参数包含Kafka 消息携带的时间戳

      从 Kafka 0.10.x 开始,Kafka 消息可以嵌入时间戳。

    2. 在这种情况下将Watermark 生成为new Watermark(System.currentTimeMillis) 不是一个好主意。您应该根据您对数据的了解创建Watermark。有关WatermarkEventTime 如何协同工作的信息,我再清楚不过了docs

    【讨论】:

    • 我在我的帖子中添加了一个编辑。你能看一下吗?
    猜你喜欢
    • 2018-06-05
    • 1970-01-01
    • 1970-01-01
    • 2020-02-06
    • 2020-05-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多